You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/03/31 23:24:50 UTC

[beam] branch master updated: Merge pull request #7840: [BEAM-6602] BigQueryIO.write natively understands Beam schemas

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 90bf972  Merge pull request #7840:  [BEAM-6602] BigQueryIO.write natively understands Beam schemas
90bf972 is described below

commit 90bf97252500317fca287182dcb8a8e96f3d5878
Author: reuvenlax <re...@google.com>
AuthorDate: Sun Mar 31 16:24:33 2019 -0700

    Merge pull request #7840:  [BEAM-6602] BigQueryIO.write natively understands Beam schemas
---
 ...DefaultCoderCloudObjectTranslatorRegistrar.java |   2 -
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 113 +++++++++++---
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    | 138 ++++++++++--------
 .../sdk/io/gcp/bigquery/DynamicDestinations.java   |   6 +-
 .../gcp/bigquery/DynamicDestinationsHelpers.java   |  30 +++-
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 162 +++++++++++++++++----
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     |  20 +--
 7 files changed, 340 insertions(+), 131 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index d43106b..25d6df9 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
 import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
@@ -98,7 +97,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           KeyPrefixCoder.class,
           RandomAccessDataCoder.class,
           StringUtf8Coder.class,
-          TableDestinationCoder.class,
           TableDestinationCoderV2.class,
           TableRowJsonCoder.class,
           TextualIntegerCoder.class,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 98b114f..85cdba4 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -201,7 +201,16 @@ import org.slf4j.LoggerFactory;
  * BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
  *
  * <pre>{@code
- * class Quote { Instant timestamp; String exchange; String symbol; double price; }
+ * class Quote {
+ *   final Instant timestamp;
+ *   final String exchange;
+ *   final String symbol;
+ *   final double price;
+ *
+ *   Quote(Instant timestamp, String exchange, String symbol, double price) {
+ *     // initialize all member variables.
+ *   }
+ * }
  *
  * PCollection<Quote> quotes = ...
  *
@@ -223,6 +232,34 @@ import org.slf4j.LoggerFactory;
  * written to must already exist. Unbounded PCollections can only be written using {@link
  * Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
  *
+ * <p>BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on
+ * the input PCollection. Beam can also automatically format the input into a TableRow in this case,
+ * if no format function is provide. In the above example, the quotes PCollection has a schema that
+ * Beam infers from the Quote POJO. So the write could be done more simply as follows:
+ *
+ * <pre>{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * class Quote {
+ *   final Instant timestamp;
+ *   final String exchange;
+ *   final String symbol;
+ *   final double price;
+ *
+ *   {@literal @}SchemaCreate
+ *   Quote(Instant timestamp, String exchange, String symbol, double price) {
+ *     // initialize all member variables.
+ *   }
+ * }
+ *
+ * PCollection<Quote> quotes = ...
+ *
+ * quotes.apply(BigQueryIO
+ *     .<Quote>write()
+ *     .to("my-project:my_dataset.my_table")
+ *     .useBeamSchema()
+ *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ * }</pre>
+ *
  * <h3>Loading historical data into time-partitioned BigQuery tables</h3>
  *
  * <p>To load historical data into a time-partitioned BigQuery table, specify {@link
@@ -1331,6 +1368,7 @@ public class BigQueryIO {
         .setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION)
         .setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)
         .setOptimizeWrites(false)
+        .setUseBeamSchema(false)
         .build();
   }
 
@@ -1452,6 +1490,8 @@ public class BigQueryIO {
 
     abstract Boolean getOptimizeWrites();
 
+    abstract Boolean getUseBeamSchema();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -1511,6 +1551,8 @@ public class BigQueryIO {
 
       abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);
 
+      abstract Builder<T> setUseBeamSchema(Boolean useBeamSchema);
+
       abstract Write<T> build();
     }
 
@@ -1618,7 +1660,6 @@ public class BigQueryIO {
 
     /** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
     public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
-      checkArgument(formatFunction != null, "formatFunction can not be null");
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
@@ -1826,10 +1867,20 @@ public class BigQueryIO {
      * BigQuery. Not enabled by default in order to maintain backwards compatibility.
      */
     @Experimental
-    public Write<T> withOptimizedWrites() {
+    public Write<T> optimizedWrites() {
       return toBuilder().setOptimizeWrites(true).build();
     }
 
+    /**
+     * If true, then the BigQuery schema will be inferred from the input schema. If no
+     * formatFunction is set, then BigQueryIO will automatically turn the input records into
+     * TableRows that match the schema.
+     */
+    @Experimental
+    public Write<T> useBeamSchema() {
+      return toBuilder().setUseBeamSchema(true).build();
+    }
+
     @VisibleForTesting
     /** This method is for test usage only */
     public Write<T> withTestServices(BigQueryServices testServices) {
@@ -1910,19 +1961,6 @@ public class BigQueryIO {
               || getDynamicDestinations() != null,
           "must set the table reference of a BigQueryIO.Write transform");
 
-      checkArgument(
-          getFormatFunction() != null,
-          "A function must be provided to convert type into a TableRow. "
-              + "use BigQueryIO.Write.withFormatFunction to provide a formatting function.");
-
-      // Require a schema if creating one or more tables.
-      checkArgument(
-          getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
-              || getJsonSchema() != null
-              || getDynamicDestinations() != null
-              || getSchemaFromView() != null,
-          "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
       List<?> allToArgs =
           Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
       checkArgument(
@@ -1995,8 +2033,9 @@ public class BigQueryIO {
         // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
         if (getJsonTimePartitioning() != null) {
           dynamicDestinations =
-              new ConstantTimePartitioningDestinations(
-                  dynamicDestinations, getJsonTimePartitioning());
+              new ConstantTimePartitioningDestinations<>(
+                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,
+                  getJsonTimePartitioning());
         }
       }
       return expandTyped(input, dynamicDestinations);
@@ -2004,6 +2043,38 @@ public class BigQueryIO {
 
     private <DestinationT> WriteResult expandTyped(
         PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+      boolean optimizeWrites = getOptimizeWrites();
+      SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
+      if (getUseBeamSchema()) {
+        checkArgument(input.hasSchema());
+        optimizeWrites = true;
+        if (formatFunction == null) {
+          // If no format function set, then we will automatically convert the input type to a
+          // TableRow.
+          formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
+        }
+        // Infer the TableSchema from the input Beam schema.
+        TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema());
+        dynamicDestinations =
+            new ConstantSchemaDestinations<>(
+                dynamicDestinations,
+                StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
+      } else {
+        // Require a schema if creating one or more tables.
+        checkArgument(
+            getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
+                || getJsonSchema() != null
+                || getDynamicDestinations() != null
+                || getSchemaFromView() != null,
+            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+      }
+
+      checkArgument(
+          formatFunction != null,
+          "A function must be provided to convert type into a TableRow. "
+              + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."
+              + "A format function is not required if Beam schemas are used.");
+
       Coder<DestinationT> destinationCoder = null;
       try {
         destinationCoder =
@@ -2014,7 +2085,7 @@ public class BigQueryIO {
       }
 
       Method method = resolveMethod(input);
-      if (getOptimizeWrites()) {
+      if (optimizeWrites) {
         PCollection<KV<DestinationT, T>> rowsWithDestination =
             input
                 .apply(
@@ -2026,12 +2097,12 @@ public class BigQueryIO {
             input.getCoder(),
             destinationCoder,
             dynamicDestinations,
-            getFormatFunction(),
+            formatFunction,
             method);
       } else {
         PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
             input
-                .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
+                .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction))
                 .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
         return continueExpandTyped(
             rowsWithDestination,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index ceab03a..6d1d15a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -26,41 +26,30 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.joda.time.chrono.ISOChronology;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.DateTimeFormatterBuilder;
 
-/**
- * Utility methods for BigQuery related operations.
- *
- * <p><b>Example: Writing to BigQuery</b>
- *
- * <pre>{@code
- * PCollection<Row> rows = ...;
- *
- * rows.apply(BigQueryIO.<Row>write()
- *       .withSchema(BigQueryUtils.toTableSchema(rows))
- *       .withFormatFunction(BigQueryUtils.toTableRow())
- *       .to("my-project:my_dataset.my_table"));
- * }</pre>
- */
+/** Utility methods for BigQuery related operations. */
 public class BigQueryUtils {
   private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =
       ImmutableMap.<TypeName, StandardSQLTypeName>builder()
@@ -76,6 +65,7 @@ public class BigQueryUtils {
           .put(TypeName.ROW, StandardSQLTypeName.STRUCT)
           .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
           .put(TypeName.STRING, StandardSQLTypeName.STRING)
+          .put(TypeName.BYTES, StandardSQLTypeName.BYTES)
           .build();
 
   private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS =
@@ -137,12 +127,18 @@ public class BigQueryUtils {
       }
       if (TypeName.ARRAY == type.getTypeName()) {
         type = type.getCollectionElementType();
+        if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {
+          throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");
+        }
         field.setMode(Mode.REPEATED.toString());
       }
       if (TypeName.ROW == type.getTypeName()) {
         Schema subType = type.getRowSchema();
         field.setFields(toTableFieldSchema(subType));
       }
+      if (TypeName.MAP == type.getTypeName()) {
+        throw new IllegalArgumentException("Maps are not supported in BigQuery.");
+      }
       field.setType(toStandardSQLTypeName(type).toString());
 
       fields.add(field);
@@ -155,17 +151,18 @@ public class BigQueryUtils {
     return new TableSchema().setFields(toTableFieldSchema(schema));
   }
 
-  /** Convert a Beam {@link PCollection} to a BigQuery {@link TableSchema}. */
-  public static TableSchema toTableSchema(PCollection<Row> rows) {
-    RowCoder coder = (RowCoder) rows.getCoder();
-    return toTableSchema(coder.getSchema());
-  }
-
-  private static final SerializableFunction<Row, TableRow> TO_TABLE_ROW = new ToTableRow();
+  private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
+      new ToTableRow(SerializableFunctions.identity());
 
   /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
   public static SerializableFunction<Row, TableRow> toTableRow() {
-    return TO_TABLE_ROW;
+    return ROW_TO_TABLE_ROW;
+  }
+
+  /** Convert a Beam schema type to a BigQuery {@link TableRow}. */
+  public static <T> SerializableFunction<T, TableRow> toTableRow(
+      SerializableFunction<T, Row> toRow) {
+    return new ToTableRow<>(toRow);
   }
 
   /** Convert {@link SchemaAndRecord} to a Beam {@link Row}. */
@@ -174,10 +171,16 @@ public class BigQueryUtils {
   }
 
   /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */
-  private static class ToTableRow implements SerializableFunction<Row, TableRow> {
+  private static class ToTableRow<T> implements SerializableFunction<T, TableRow> {
+    private final SerializableFunction<T, Row> toRow;
+
+    ToTableRow(SerializableFunction<T, Row> toRow) {
+      this.toRow = toRow;
+    }
+
     @Override
-    public TableRow apply(Row input) {
-      return toTableRow(input);
+    public TableRow apply(T input) {
+      return toTableRow(toRow.apply(input));
     }
   }
 
@@ -214,40 +217,61 @@ public class BigQueryUtils {
     TableRow output = new TableRow();
     for (int i = 0; i < row.getFieldCount(); i++) {
       Object value = row.getValue(i);
-
       Field schemaField = row.getSchema().getField(i);
-      TypeName type = schemaField.getType().getTypeName();
-
-      switch (type) {
-        case ARRAY:
-          type = schemaField.getType().getCollectionElementType().getTypeName();
-          if (TypeName.ROW == type) {
-            List<Row> rows = (List<Row>) value;
-            List<TableRow> tableRows = new ArrayList<>(rows.size());
-            for (int j = 0; j < rows.size(); j++) {
-              tableRows.add(toTableRow(rows.get(j)));
-            }
-            value = tableRows;
-          }
-          break;
-        case ROW:
-          value = toTableRow((Row) value);
-          break;
-        case DATETIME:
-          DateTimeFormatter patternFormat =
-              new DateTimeFormatterBuilder()
-                  .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
-                  .toFormatter();
-          value = value == null ? null : ((Instant) value).toDateTime().toString(patternFormat);
-          break;
-        default:
-          value = row.getValue(i);
-          break;
+      output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value));
+    }
+    return output;
+  }
+
+  private static Object fromBeamField(FieldType fieldType, Object fieldValue) {
+    if (fieldValue == null) {
+      if (!fieldType.getNullable()) {
+        throw new IllegalArgumentException("Field is not nullable.");
       }
+      return null;
+    }
 
-      output = output.set(schemaField.getName(), value);
+    switch (fieldType.getTypeName()) {
+      case ARRAY:
+        FieldType elementType = fieldType.getCollectionElementType();
+        List items = (List) fieldValue;
+        List convertedItems = Lists.newArrayListWithCapacity(items.size());
+        for (Object item : items) {
+          convertedItems.add(fromBeamField(elementType, item));
+        }
+        return convertedItems;
+
+      case ROW:
+        return toTableRow((Row) fieldValue);
+
+      case DATETIME:
+        DateTimeFormatter patternFormat =
+            new DateTimeFormatterBuilder()
+                .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+                .toFormatter();
+        return ((Instant) fieldValue).toDateTime().toString(patternFormat);
+
+      case INT16:
+      case INT32:
+      case INT64:
+      case FLOAT:
+      case DOUBLE:
+      case STRING:
+      case BOOLEAN:
+        return fieldValue.toString();
+
+      case DECIMAL:
+        return fieldValue.toString();
+
+      case BYTES:
+        ByteBuffer byteBuffer = (ByteBuffer) fieldValue;
+        byte[] bytes = new byte[byteBuffer.limit()];
+        byteBuffer.get(bytes);
+        return BaseEncoding.base64().encode(bytes);
+
+      default:
+        return fieldValue;
     }
-    return output;
   }
 
   /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index c1567cc..ea3d435 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -112,11 +112,7 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
     return sideInputAccessor.sideInput(view);
   }
 
-  final void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
-    this.sideInputAccessor = sideInputAccessor;
-  }
-
-  final void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+  void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
     this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
index 99c3f6f..d006220 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java
@@ -23,10 +23,13 @@ import com.google.api.services.bigquery.model.TableSchema;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -75,7 +78,7 @@ class DynamicDestinationsHelpers {
 
     @Override
     public Coder<TableDestination> getDestinationCoder() {
-      return TableDestinationCoder.of();
+      return TableDestinationCoderV2.of();
     }
   }
 
@@ -149,25 +152,42 @@ class DynamicDestinationsHelpers {
     }
 
     @Override
+    Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
+        throws CannotProvideCoderException {
+      return inner.getDestinationCoderWithDefault(registry);
+    }
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return inner.getSideInputs();
+    }
+
+    @Override
+    void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+      super.setSideInputAccessorFromProcessContext(context);
+      inner.setSideInputAccessorFromProcessContext(context);
+    }
+
+    @Override
     public String toString() {
       return MoreObjects.toStringHelper(this).add("inner", inner).toString();
     }
   }
 
   /** Returns the same schema for every table. */
-  static class ConstantSchemaDestinations<T>
-      extends DelegatingDynamicDestinations<T, TableDestination> {
+  static class ConstantSchemaDestinations<T, DestinationT>
+      extends DelegatingDynamicDestinations<T, DestinationT> {
     @Nullable private final ValueProvider<String> jsonSchema;
 
     ConstantSchemaDestinations(
-        DynamicDestinations<T, TableDestination> inner, ValueProvider<String> jsonSchema) {
+        DynamicDestinations<T, DestinationT> inner, ValueProvider<String> jsonSchema) {
       super(inner);
       checkArgument(jsonSchema != null, "jsonSchema can not be null");
       this.jsonSchema = jsonSchema;
     }
 
     @Override
-    public TableSchema getSchema(TableDestination destination) {
+    public TableSchema getSchema(DestinationT destination) {
       String jsonSchema = this.jsonSchema.get();
       checkArgument(jsonSchema != null, "jsonSchema can not be null");
       return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index df18467..a2ea13b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -61,8 +61,14 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.JavaFieldSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -84,6 +90,7 @@ import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -166,7 +173,7 @@ public class BigQueryIOWriteTest implements Serializable {
   }
 
   // Create an intermediate type to ensure that coder inference up the inheritance tree is tested.
-  abstract static class StringIntegerDestinations extends DynamicDestinations<String, Integer> {}
+  abstract static class StringLongDestinations extends DynamicDestinations<String, Long> {}
 
   @Test
   public void testWriteEmptyPCollection() throws Exception {
@@ -193,15 +200,28 @@ public class BigQueryIOWriteTest implements Serializable {
 
   @Test
   public void testWriteDynamicDestinationsBatch() throws Exception {
-    writeDynamicDestinations(false);
+    writeDynamicDestinations(false, false);
+  }
+
+  @Test
+  public void testWriteDynamicDestinationsBatchWithSchemas() throws Exception {
+    writeDynamicDestinations(false, true);
   }
 
   @Test
   public void testWriteDynamicDestinationsStreaming() throws Exception {
-    writeDynamicDestinations(true);
+    writeDynamicDestinations(true, false);
   }
 
-  public void writeDynamicDestinations(boolean streaming) throws Exception {
+  @Test
+  public void testWriteDynamicDestinationsStreamingWithSchemas() throws Exception {
+    writeDynamicDestinations(true, true);
+  }
+
+  public void writeDynamicDestinations(boolean streaming, boolean schemas) throws Exception {
+    final Schema schema =
+        Schema.builder().addField("name", FieldType.STRING).addField("id", FieldType.INT32).build();
+
     final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)");
 
     final PCollectionView<List<String>> sideInput1 =
@@ -231,43 +251,45 @@ public class BigQueryIOWriteTest implements Serializable {
       users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
     }
 
+    if (schemas) {
+      users =
+          users.setSchema(
+              schema,
+              user -> {
+                Matcher matcher = userPattern.matcher(user);
+                checkState(matcher.matches());
+                return Row.withSchema(schema)
+                    .addValue(matcher.group(1))
+                    .addValue(Integer.valueOf(matcher.group(2)))
+                    .build();
+              },
+              r -> r.getString(0) + r.getInt32(1));
+    }
+
     // Use a partition decorator to verify that partition decorators are supported.
     final String partitionDecorator = "20171127";
 
-    users.apply(
-        "WriteBigQuery",
+    BigQueryIO.Write<String> write =
         BigQueryIO.<String>write()
             .withTestServices(fakeBqServices)
             .withMaxFilesPerBundle(5)
             .withMaxFileSize(10)
             .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
-            .withFormatFunction(
-                user -> {
-                  Matcher matcher = userPattern.matcher(user);
-                  if (matcher.matches()) {
-                    return new TableRow()
-                        .set("name", matcher.group(1))
-                        .set("id", Integer.valueOf(matcher.group(2)));
-                  }
-                  throw new RuntimeException("Unmatching element " + user);
-                })
             .to(
-                new StringIntegerDestinations() {
+                new StringLongDestinations() {
                   @Override
-                  public Integer getDestination(ValueInSingleWindow<String> element) {
+                  public Long getDestination(ValueInSingleWindow<String> element) {
                     assertThat(
                         element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
                     Matcher matcher = userPattern.matcher(element.getValue());
-                    if (matcher.matches()) {
-                      // Since we name tables by userid, we can simply store an Integer to represent
-                      // a table.
-                      return Integer.valueOf(matcher.group(2));
-                    }
-                    throw new RuntimeException("Unmatching destination " + element.getValue());
+                    checkState(matcher.matches());
+                    // Since we name tables by userid, we can simply store a Long to represent
+                    // a table.
+                    return Long.valueOf(matcher.group(2));
                   }
 
                   @Override
-                  public TableDestination getTable(Integer userId) {
+                  public TableDestination getTable(Long userId) {
                     verifySideInputs();
                     // Each user in it's own table.
                     return new TableDestination(
@@ -276,7 +298,7 @@ public class BigQueryIOWriteTest implements Serializable {
                   }
 
                   @Override
-                  public TableSchema getSchema(Integer userId) {
+                  public TableSchema getSchema(Long userId) {
                     verifySideInputs();
                     return new TableSchema()
                         .setFields(
@@ -299,21 +321,33 @@ public class BigQueryIOWriteTest implements Serializable {
                         allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
                   }
                 })
-            .withoutValidation());
+            .withoutValidation();
+    if (schemas) {
+      write = write.useBeamSchema();
+    } else {
+      write =
+          write.withFormatFunction(
+              user -> {
+                Matcher matcher = userPattern.matcher(user);
+                checkState(matcher.matches());
+                return new TableRow().set("name", matcher.group(1)).set("id", matcher.group(2));
+              });
+    }
+    users.apply("WriteBigQuery", write);
     p.run();
 
-    Map<Integer, List<TableRow>> expectedTableRows = Maps.newHashMap();
+    Map<Long, List<TableRow>> expectedTableRows = Maps.newHashMap();
     for (String anUserList : userList) {
       Matcher matcher = userPattern.matcher(anUserList);
       checkState(matcher.matches());
       String nickname = matcher.group(1);
-      int userid = Integer.valueOf(matcher.group(2));
+      Long userid = Long.valueOf(matcher.group(2));
       List<TableRow> expected =
           expectedTableRows.computeIfAbsent(userid, k -> Lists.newArrayList());
-      expected.add(new TableRow().set("name", nickname).set("id", userid));
+      expected.add(new TableRow().set("name", nickname).set("id", userid.toString()));
     }
 
-    for (Map.Entry<Integer, List<TableRow>> entry : expectedTableRows.entrySet()) {
+    for (Map.Entry<Long, List<TableRow>> entry : expectedTableRows.entrySet()) {
       assertThat(
           fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()),
           containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class)));
@@ -577,6 +611,72 @@ public class BigQueryIOWriteTest implements Serializable {
             new TableRow().set("name", "d").set("number", 4)));
   }
 
+  @DefaultSchema(JavaFieldSchema.class)
+  static class SchemaPojo {
+    final String name;
+    final int number;
+
+    @SchemaCreate
+    SchemaPojo(String name, int number) {
+      this.name = name;
+      this.number = number;
+    }
+  }
+
+  @Test
+  public void testSchemaWriteLoads() throws Exception {
+    p.apply(
+            Create.of(
+                new SchemaPojo("a", 1),
+                new SchemaPojo("b", 2),
+                new SchemaPojo("c", 3),
+                new SchemaPojo("d", 4)))
+        .apply(
+            BigQueryIO.<SchemaPojo>write()
+                .to("project-id:dataset-id.table-id")
+                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                .withMethod(Method.FILE_LOADS)
+                .useBeamSchema()
+                .withTestServices(fakeBqServices)
+                .withoutValidation());
+    p.run();
+
+    assertThat(
+        fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(
+            new TableRow().set("name", "a").set("number", "1"),
+            new TableRow().set("name", "b").set("number", "2"),
+            new TableRow().set("name", "c").set("number", "3"),
+            new TableRow().set("name", "d").set("number", "4")));
+  }
+
+  @Test
+  public void testSchemaWriteStreams() throws Exception {
+    p.apply(
+            Create.of(
+                new SchemaPojo("a", 1),
+                new SchemaPojo("b", 2),
+                new SchemaPojo("c", 3),
+                new SchemaPojo("d", 4)))
+        .apply(
+            BigQueryIO.<SchemaPojo>write()
+                .to("project-id:dataset-id.table-id")
+                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
+                .withMethod(Method.STREAMING_INSERTS)
+                .useBeamSchema()
+                .withTestServices(fakeBqServices)
+                .withoutValidation());
+    p.run();
+
+    assertThat(
+        fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(
+            new TableRow().set("name", "a").set("number", "1"),
+            new TableRow().set("name", "b").set("number", "2"),
+            new TableRow().set("name", "c").set("number", "3"),
+            new TableRow().set("name", "d").set("number", "4")));
+  }
+
   /**
    * A generic window function that allows partitioning data into windows by a string value.
    *
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 2f599d7..3e8a880 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -136,17 +136,17 @@ public class BigQueryUtilsTest {
     TableRow row = toTableRow().apply(FLAT_ROW);
 
     assertThat(row.size(), equalTo(5));
-    assertThat(row, hasEntry("id", 123L));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("id", "123"));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
   }
 
   @Test
   public void testToTableRow_array() {
     TableRow row = toTableRow().apply(ARRAY_ROW);
 
-    assertThat(row, hasEntry("ids", Arrays.asList(123L, 124L)));
+    assertThat(row, hasEntry("ids", Arrays.asList("123", "124")));
     assertThat(row.size(), equalTo(1));
   }
 
@@ -157,10 +157,10 @@ public class BigQueryUtilsTest {
     assertThat(row.size(), equalTo(1));
     row = (TableRow) row.get("row");
     assertThat(row.size(), equalTo(5));
-    assertThat(row, hasEntry("id", 123L));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("id", "123"));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
   }
 
   @Test
@@ -170,10 +170,10 @@ public class BigQueryUtilsTest {
     assertThat(row.size(), equalTo(1));
     row = ((List<TableRow>) row.get("rows")).get(0);
     assertThat(row.size(), equalTo(5));
-    assertThat(row, hasEntry("id", 123L));
-    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("id", "123"));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
-    assertThat(row, hasEntry("valid", false));
+    assertThat(row, hasEntry("valid", "false"));
   }
 
   @Test