You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:45 UTC

[3/6] beam git commit: Refactor BigQueryIO.Write helper transforms into separate files.

Refactor BigQueryIO.Write helper transforms into separate files.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6ef0104
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6ef0104
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6ef0104

Branch: refs/heads/master
Commit: d6ef0104d5b8e1076cea2a9d08ad14ba0e8e84f1
Parents: 2cc2e81
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 17 15:30:53 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Mar 28 08:46:15 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |  182 +++
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |   64 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 1063 +-----------------
 .../io/gcp/bigquery/BigQueryQuerySource.java    |   21 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java |   20 +-
 .../io/gcp/bigquery/BigQueryTableSource.java    |   18 +
 .../io/gcp/bigquery/PassThroughThenCleanup.java |   18 +
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |   44 +
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |   87 ++
 .../sdk/io/gcp/bigquery/StreamWithDeDup.java    |   98 ++
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |  186 +++
 .../beam/sdk/io/gcp/bigquery/TableRowInfo.java  |   34 +
 .../sdk/io/gcp/bigquery/TableRowInfoCoder.java  |   68 ++
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |   84 ++
 .../gcp/bigquery/TagWithUniqueIdsAndTable.java  |  135 +++
 .../sdk/io/gcp/bigquery/TransformingSource.java |   18 +
 .../beam/sdk/io/gcp/bigquery/WriteBundles.java  |   82 ++
 .../sdk/io/gcp/bigquery/WritePartition.java     |   79 ++
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |  180 +++
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  213 ++++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   39 +-
 21 files changed, 1653 insertions(+), 1080 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
new file mode 100644
index 0000000..75b1cc7
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
+ */
+class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, PDone> {
+  BigQueryIO.Write<T> write;
+
+  BatchLoadBigQuery(BigQueryIO.Write<T> write) {
+    this.write = write;
+  }
+
+  @Override
+  public PDone expand(PCollection<T> input) {
+    Pipeline p = input.getPipeline();
+    BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+    ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
+
+    final String stepUuid = BigQueryHelpers.randomUUIDString();
+
+    String tempLocation = options.getTempLocation();
+    String tempFilePrefix;
+    try {
+      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+      tempFilePrefix = factory.resolve(
+          factory.resolve(tempLocation, "BigQueryWriteTemp"),
+          stepUuid);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+          e);
+    }
+
+    // Create a singleton job ID token at execution time.
+    PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+    PCollectionView<String> jobIdTokenView = p
+        .apply("TriggerIdCreation", Create.of("ignored"))
+        .apply("CreateJobId", MapElements.via(
+            new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return stepUuid;
+              }
+            }))
+        .apply(View.<String>asSingleton());
+
+    PCollection<T> typedInputInGlobalWindow =
+        input.apply(
+            Window.<T>into(new GlobalWindows())
+                .triggering(DefaultTrigger.of())
+                .discardingFiredPanes());
+    // Avoid applying the formatFunction if it is the identity formatter.
+    PCollection<TableRow> inputInGlobalWindow;
+    if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
+      inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
+    } else {
+      inputInGlobalWindow = typedInputInGlobalWindow
+          .apply(MapElements.via(write.getFormatFunction())
+              .withOutputType(new TypeDescriptor<TableRow>() {
+              }));
+    }
+
+    // PCollection of filename, file byte size.
+    PCollection<KV<String, Long>> results = inputInGlobalWindow
+        .apply("WriteBundles",
+            ParDo.of(new WriteBundles(tempFilePrefix)));
+
+    TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<Long, List<String>>> singlePartitionTag =
+        new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+    // Turn the list of files and record counts in a PCollectionView that can be used as a
+    // side input.
+    PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+        .apply("ResultsView", View.<KV<String, Long>>asIterable());
+    PCollectionTuple partitions = singleton.apply(ParDo
+        .of(new WritePartition(
+            resultsView,
+            multiPartitionsTag,
+            singlePartitionTag))
+        .withSideInputs(resultsView)
+        .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+    // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
+    // the import needs to be split into multiple partitions, and those partitions will be
+    // specified in multiPartitionsTag.
+    PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+        .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+        .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+            false,
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            tempFilePrefix,
+            NestedValueProvider.of(table, new TableRefToJson()),
+            write.getJsonSchema(),
+            WriteDisposition.WRITE_EMPTY,
+            CreateDisposition.CREATE_IF_NEEDED,
+            write.getTableDescription()))
+            .withSideInputs(jobIdTokenView));
+
+    PCollectionView<Iterable<String>> tempTablesView = tempTables
+        .apply("TempTablesView", View.<String>asIterable());
+    singleton.apply(ParDo
+        .of(new WriteRename(
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            NestedValueProvider.of(table, new TableRefToJson()),
+            write.getWriteDisposition(),
+            write.getCreateDisposition(),
+            tempTablesView,
+            write.getTableDescription()))
+        .withSideInputs(tempTablesView, jobIdTokenView));
+
+    // Write single partition to final table
+    partitions.get(singlePartitionTag)
+        .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+        .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+            true,
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            tempFilePrefix,
+            NestedValueProvider.of(table, new TableRefToJson()),
+            write.getJsonSchema(),
+            write.getWriteDisposition(),
+            write.getCreateDisposition(),
+            write.getTableDescription()))
+            .withSideInputs(jobIdTokenView));
+
+    return PDone.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 9fba938..37ff124 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -1,5 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
@@ -14,16 +34,34 @@ import java.util.UUID;
 import java.util.regex.Matcher;
 import javax.annotation.Nullable;
 
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
-
 /**
  * A set of helper functions and classes used by {@link BigQueryIO}.
  */
-public class BigQueryHelpers {
+class BigQueryHelpers {
+  private static final String RESOURCE_NOT_FOUND_ERROR =
+      "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
+          + " execution. If the %1$s is created by an earlier stage of the pipeline, this"
+          + " validation can be disabled using #withoutValidation.";
+
+  private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
+      "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by"
+          + " an earlier stage of the pipeline, this validation can be disabled using"
+          + " #withoutValidation.";
+
+  /**
+   * Status of a BigQuery job or request.
+   */
+  enum Status {
+    SUCCEEDED,
+    FAILED,
+    UNKNOWN,
+  }
+
   @Nullable
   /**
    * Return a displayable string representation for a {@link TableReference}.
@@ -138,6 +176,26 @@ public class BigQueryHelpers {
     return UUID.randomUUID().toString().replaceAll("-", "");
   }
 
+  static void verifyTableNotExistOrEmpty(
+      DatasetService datasetService,
+      TableReference tableRef) {
+    try {
+      if (datasetService.getTable(tableRef) != null) {
+        checkState(
+            datasetService.isTableEmpty(tableRef),
+            "BigQuery table is not empty: %s.",
+            toTableSpec(tableRef));
+      }
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(
+          "unable to confirm BigQuery table emptiness for table "
+              + toTableSpec(tableRef), e);
+    }
+  }
+
   @VisibleForTesting
   static class JsonSchemaToTableSchema
       implements SerializableFunction<String, TableSchema> {

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3f2f3e8..4917083 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -18,19 +18,13 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -39,54 +33,30 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import com.google.common.io.CountingOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -94,41 +64,19 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.FileIOChannelFactory;
-import org.apache.beam.sdk.util.GcsIOChannelFactory;
-import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -296,7 +244,7 @@ public class BigQueryIO {
    * A formatting function that maps a TableRow to itself. This allows sending a
    * {@code PCollection<TableRow>} directly to BigQueryIO.Write.
    */
-  private static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
+   static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER =
       new SerializableFunction<TableRow, TableRow>() {
     @Override
     public TableRow apply(TableRow input) {
@@ -622,7 +570,7 @@ public class BigQueryIO {
      *
      * <p>If the table's project is not specified, use the executing project.
      */
-    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
         BigQueryOptions bqOptions) {
       ValueProvider<TableReference> table = getTableProvider();
       if (table == null) {
@@ -753,11 +701,11 @@ public class BigQueryIO {
     static final long MAX_SIZE_BYTES = 11 * (1L << 40);
 
     // The maximum number of retry jobs.
-    private static final int MAX_RETRY_JOBS = 3;
+    static final int MAX_RETRY_JOBS = 3;
 
     // The maximum number of retries to poll the status of a job.
     // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-    private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+    static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
     @Nullable abstract ValueProvider<String> getJsonTableRef();
     @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableReference>
@@ -974,26 +922,6 @@ public class BigQueryIO {
       return toBuilder().setBigQueryServices(testServices).build();
     }
 
-    private static void verifyTableNotExistOrEmpty(
-        DatasetService datasetService,
-        TableReference tableRef) {
-      try {
-        if (datasetService.getTable(tableRef) != null) {
-          checkState(
-              datasetService.isTableEmpty(tableRef),
-              "BigQuery table is not empty: %s.",
-              BigQueryHelpers.toTableSpec(tableRef));
-        }
-      } catch (IOException | InterruptedException e) {
-        if (e instanceof InterruptedException) {
-          Thread.currentThread().interrupt();
-        }
-        throw new RuntimeException(
-            "unable to confirm BigQuery table emptiness for table "
-                + BigQueryHelpers.toTableSpec(tableRef), e);
-      }
-    }
-
     @Override
     public void validate(PCollection<T> input) {
       BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
@@ -1030,7 +958,7 @@ public class BigQueryIO {
           verifyTablePresence(datasetService, table);
         }
         if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-          verifyTableNotExistOrEmpty(datasetService, table);
+          BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
         }
       }
 
@@ -1074,176 +1002,12 @@ public class BigQueryIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      Pipeline p = input.getPipeline();
-      BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
         return input.apply(new StreamWithDeDup<T>(this));
-      }
-
-      ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
-      final String stepUuid = BigQueryHelpers.randomUUIDString();
-
-      String tempLocation = options.getTempLocation();
-      String tempFilePrefix;
-      try {
-        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-        tempFilePrefix = factory.resolve(
-                factory.resolve(tempLocation, "BigQueryWriteTemp"),
-                stepUuid);
-      } catch (IOException e) {
-        throw new RuntimeException(
-            String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-            e);
-      }
-
-      // Create a singleton job ID token at execution time.
-      PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
-      PCollectionView<String> jobIdTokenView = p
-          .apply("TriggerIdCreation", Create.of("ignored"))
-          .apply("CreateJobId", MapElements.via(
-              new SimpleFunction<String, String>() {
-                @Override
-                public String apply(String input) {
-                  return stepUuid;
-                }
-              }))
-          .apply(View.<String>asSingleton());
-
-      PCollection<T> typedInputInGlobalWindow =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-      // Avoid applying the formatFunction if it is the identity formatter.
-      PCollection<TableRow> inputInGlobalWindow;
-      if (getFormatFunction() == IDENTITY_FORMATTER) {
-        inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
       } else {
-        inputInGlobalWindow = typedInputInGlobalWindow
-            .apply(MapElements.via(getFormatFunction())
-                .withOutputType(new TypeDescriptor<TableRow>() {
-                }));
-      }
-
-      // PCollection of filename, file byte size.
-      PCollection<KV<String, Long>> results = inputInGlobalWindow
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles(tempFilePrefix)));
-
-      TupleTag<KV<Long, List<String>>> multiPartitionsTag =
-          new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
-      TupleTag<KV<Long, List<String>>> singlePartitionTag =
-          new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
-      // Turn the list of files and record counts in a PCollectionView that can be used as a
-      // side input.
-      PCollectionView<Iterable<KV<String, Long>>> resultsView = results
-          .apply("ResultsView", View.<KV<String, Long>>asIterable());
-      PCollectionTuple partitions = singleton.apply(ParDo
-          .of(new WritePartition(
-              resultsView,
-              multiPartitionsTag,
-              singlePartitionTag))
-          .withSideInputs(resultsView)
-          .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
-      // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
-      // the import needs to be split into multiple partitions, and those partitions will be
-      // specified in multiPartitionsTag.
-      PCollection<String> tempTables = partitions.get(multiPartitionsTag)
-          .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
-          .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
-              false,
-              getBigQueryServices(),
-              jobIdTokenView,
-              tempFilePrefix,
-              NestedValueProvider.of(table, new TableRefToJson()),
-              getJsonSchema(),
-              WriteDisposition.WRITE_EMPTY,
-              CreateDisposition.CREATE_IF_NEEDED,
-              getTableDescription()))
-          .withSideInputs(jobIdTokenView));
-
-      PCollectionView<Iterable<String>> tempTablesView = tempTables
-          .apply("TempTablesView", View.<String>asIterable());
-      singleton.apply(ParDo
-          .of(new WriteRename(
-              getBigQueryServices(),
-              jobIdTokenView,
-              NestedValueProvider.of(table, new TableRefToJson()),
-              getWriteDisposition(),
-              getCreateDisposition(),
-              tempTablesView,
-              getTableDescription()))
-          .withSideInputs(tempTablesView, jobIdTokenView));
-
-      // Write single partition to final table
-      partitions.get(singlePartitionTag)
-          .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
-          .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
-              true,
-              getBigQueryServices(),
-              jobIdTokenView,
-              tempFilePrefix,
-              NestedValueProvider.of(table, new TableRefToJson()),
-              getJsonSchema(),
-              getWriteDisposition(),
-              getCreateDisposition(),
-              getTableDescription()))
-          .withSideInputs(jobIdTokenView));
-
-      return PDone.in(input.getPipeline());
-    }
-
-    private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
-      private transient TableRowWriter writer = null;
-      private final String tempFilePrefix;
-
-      WriteBundles(String tempFilePrefix) {
-        this.tempFilePrefix = tempFilePrefix;
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        if (writer == null) {
-          writer = new TableRowWriter(tempFilePrefix);
-          writer.open(UUID.randomUUID().toString());
-          LOG.debug("Done opening writer {}", writer);
-        }
-        try {
-          writer.write(c.element());
-        } catch (Exception e) {
-          // Discard write result and close the write.
-          try {
-            writer.close();
-            // The writer does not need to be reset, as this DoFn cannot be reused.
-          } catch (Exception closeException) {
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
-        }
-      }
-
-      @FinishBundle
-      public void finishBundle(Context c) throws Exception {
-        if (writer != null) {
-          c.output(writer.close());
-          writer = null;
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-                .withLabel("Temporary File Prefix"));
+        return input.apply(new BatchLoadBigQuery<T>(this));
       }
     }
 
@@ -1289,7 +1053,7 @@ public class BigQueryIO {
      *
      * <p>If the table's project is not specified, use the executing project.
      */
-    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
         BigQueryOptions bqOptions) {
       ValueProvider<TableReference> table = getTable();
       if (table == null) {
@@ -1319,391 +1083,6 @@ public class BigQueryIO {
     }
 
 
-    static class TableRowWriter {
-      private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
-      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-      private final String tempFilePrefix;
-      private String id;
-      private String fileName;
-      private WritableByteChannel channel;
-      protected String mimeType = MimeTypes.TEXT;
-      private CountingOutputStream out;
-
-      TableRowWriter(String basename) {
-        this.tempFilePrefix = basename;
-      }
-
-      public final void open(String uId) throws Exception {
-        id = uId;
-        fileName = tempFilePrefix + id;
-        LOG.debug("Opening {}.", fileName);
-        channel = IOChannelUtils.create(fileName, mimeType);
-        try {
-          out = new CountingOutputStream(Channels.newOutputStream(channel));
-          LOG.debug("Writing header to {}.", fileName);
-        } catch (Exception e) {
-          try {
-            LOG.error("Writing header to {} failed, closing channel.", fileName);
-            channel.close();
-          } catch (IOException closeException) {
-            LOG.error("Closing channel for {} failed", fileName);
-          }
-          throw e;
-        }
-        LOG.debug("Starting write of bundle {} to {}.", this.id, fileName);
-      }
-
-      public void write(TableRow value) throws Exception {
-        CODER.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
-      }
-
-      public final KV<String, Long> close() throws IOException {
-        channel.close();
-        return KV.of(fileName, out.getCount());
-      }
-    }
-
-    /**
-     * Partitions temporary files based on number of files and file sizes.
-     */
-    static class WritePartition extends DoFn<String, KV<Long, List<String>>> {
-      private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
-      private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
-      private TupleTag<KV<Long, List<String>>> singlePartitionTag;
-
-      public WritePartition(
-          PCollectionView<Iterable<KV<String, Long>>> resultsView,
-          TupleTag<KV<Long, List<String>>> multiPartitionsTag,
-          TupleTag<KV<Long, List<String>>> singlePartitionTag) {
-        this.resultsView = resultsView;
-        this.multiPartitionsTag = multiPartitionsTag;
-        this.singlePartitionTag = singlePartitionTag;
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
-        if (results.isEmpty()) {
-          TableRowWriter writer = new TableRowWriter(c.element());
-          writer.open(UUID.randomUUID().toString());
-          results.add(writer.close());
-        }
-
-        long partitionId = 0;
-        int currNumFiles = 0;
-        long currSizeBytes = 0;
-        List<String> currResults = Lists.newArrayList();
-        for (int i = 0; i < results.size(); ++i) {
-          KV<String, Long> fileResult = results.get(i);
-          if (currNumFiles + 1 > Write.MAX_NUM_FILES
-              || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
-            c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
-            currResults = Lists.newArrayList();
-            currNumFiles = 0;
-            currSizeBytes = 0;
-          }
-          ++currNumFiles;
-          currSizeBytes += fileResult.getValue();
-          currResults.add(fileResult.getKey());
-        }
-        if (partitionId == 0) {
-          c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults));
-        } else {
-          c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
-        }
-      }
-    }
-
-    /**
-     * Writes partitions to BigQuery tables.
-     */
-    static class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
-      private final boolean singlePartition;
-      private final BigQueryServices bqServices;
-      private final PCollectionView<String> jobIdToken;
-      private final String tempFilePrefix;
-      private final ValueProvider<String> jsonTableRef;
-      private final ValueProvider<String> jsonSchema;
-      private final WriteDisposition writeDisposition;
-      private final CreateDisposition createDisposition;
-      @Nullable private final String tableDescription;
-
-      public WriteTables(
-          boolean singlePartition,
-          BigQueryServices bqServices,
-          PCollectionView<String> jobIdToken,
-          String tempFilePrefix,
-          ValueProvider<String> jsonTableRef,
-          ValueProvider<String> jsonSchema,
-          WriteDisposition writeDisposition,
-          CreateDisposition createDisposition,
-          @Nullable String tableDescription) {
-        this.singlePartition = singlePartition;
-        this.bqServices = bqServices;
-        this.jobIdToken = jobIdToken;
-        this.tempFilePrefix = tempFilePrefix;
-        this.jsonTableRef = jsonTableRef;
-        this.jsonSchema = jsonSchema;
-        this.writeDisposition = writeDisposition;
-        this.createDisposition = createDisposition;
-        this.tableDescription = tableDescription;
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
-        String jobIdPrefix = String.format(
-            c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
-        TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
-            TableReference.class);
-        if (!singlePartition) {
-          ref.setTableId(jobIdPrefix);
-        }
-
-        load(
-            bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            jobIdPrefix,
-            ref,
-            BigQueryHelpers.fromJsonString(
-                jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
-            partition,
-            writeDisposition,
-            createDisposition,
-            tableDescription);
-        c.output(BigQueryHelpers.toJsonString(ref));
-
-        removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
-      }
-
-      private void load(
-          JobService jobService,
-          DatasetService datasetService,
-          String jobIdPrefix,
-          TableReference ref,
-          @Nullable TableSchema schema,
-          List<String> gcsUris,
-          WriteDisposition writeDisposition,
-          CreateDisposition createDisposition,
-          @Nullable String tableDescription) throws InterruptedException, IOException {
-        JobConfigurationLoad loadConfig = new JobConfigurationLoad()
-            .setDestinationTable(ref)
-            .setSchema(schema)
-            .setSourceUris(gcsUris)
-            .setWriteDisposition(writeDisposition.name())
-            .setCreateDisposition(createDisposition.name())
-            .setSourceFormat("NEWLINE_DELIMITED_JSON");
-
-        String projectId = ref.getProjectId();
-        Job lastFailedLoadJob = null;
-        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
-          String jobId = jobIdPrefix + "-" + i;
-          JobReference jobRef = new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId);
-          jobService.startLoadJob(jobRef, loadConfig);
-          Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
-          Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
-          switch (jobStatus) {
-            case SUCCEEDED:
-              if (tableDescription != null) {
-                datasetService.patchTableDescription(ref, tableDescription);
-              }
-              return;
-            case UNKNOWN:
-              throw new RuntimeException(String.format(
-                  "UNKNOWN status of load job [%s]: %s.", jobId,
-                  BigQueryHelpers.jobToPrettyString(loadJob)));
-            case FAILED:
-              lastFailedLoadJob = loadJob;
-              continue;
-            default:
-              throw new IllegalStateException(String.format(
-                  "Unexpected status [%s] of load job: %s.",
-                  jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
-          }
-        }
-        throw new RuntimeException(String.format(
-            "Failed to create load job with id prefix %s, "
-                + "reached max retries: %d, last failed load job: %s.",
-            jobIdPrefix,
-            Write.MAX_RETRY_JOBS,
-            BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
-      }
-
-      static void removeTemporaryFiles(
-          PipelineOptions options,
-          String tempFilePrefix,
-          Collection<String> files)
-          throws IOException {
-        IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
-        if (factory instanceof GcsIOChannelFactory) {
-          GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options);
-          gcsUtil.remove(files);
-        } else if (factory instanceof FileIOChannelFactory) {
-          for (String filename : files) {
-            LOG.debug("Removing file {}", filename);
-            boolean exists = Files.deleteIfExists(Paths.get(filename));
-            if (!exists) {
-              LOG.debug("{} does not exist.", filename);
-            }
-          }
-        } else {
-          throw new IOException("Unrecognized file system.");
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-                .withLabel("Temporary File Prefix"))
-            .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
-                .withLabel("Table Reference"))
-            .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
-                .withLabel("Table Schema"))
-            .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-                .withLabel("Table Description"));
-      }
-    }
-
-    /**
-     * Copies temporary tables to destination table.
-     */
-    static class WriteRename extends DoFn<String, Void> {
-      private final BigQueryServices bqServices;
-      private final PCollectionView<String> jobIdToken;
-      private final ValueProvider<String> jsonTableRef;
-      private final WriteDisposition writeDisposition;
-      private final CreateDisposition createDisposition;
-      private final PCollectionView<Iterable<String>> tempTablesView;
-      @Nullable private final String tableDescription;
-
-      public WriteRename(
-          BigQueryServices bqServices,
-          PCollectionView<String> jobIdToken,
-          ValueProvider<String> jsonTableRef,
-          WriteDisposition writeDisposition,
-          CreateDisposition createDisposition,
-          PCollectionView<Iterable<String>> tempTablesView,
-          @Nullable String tableDescription) {
-        this.bqServices = bqServices;
-        this.jobIdToken = jobIdToken;
-        this.jsonTableRef = jsonTableRef;
-        this.writeDisposition = writeDisposition;
-        this.createDisposition = createDisposition;
-        this.tempTablesView = tempTablesView;
-        this.tableDescription = tableDescription;
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
-
-        // Do not copy if no temp tables are provided
-        if (tempTablesJson.size() == 0) {
-          return;
-        }
-
-        List<TableReference> tempTables = Lists.newArrayList();
-        for (String table : tempTablesJson) {
-          tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
-        }
-        copy(
-            bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-            c.sideInput(jobIdToken),
-            BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
-            tempTables,
-            writeDisposition,
-            createDisposition,
-            tableDescription);
-
-        DatasetService tableService =
-            bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
-        removeTemporaryTables(tableService, tempTables);
-      }
-
-      private void copy(
-          JobService jobService,
-          DatasetService datasetService,
-          String jobIdPrefix,
-          TableReference ref,
-          List<TableReference> tempTables,
-          WriteDisposition writeDisposition,
-          CreateDisposition createDisposition,
-          @Nullable String tableDescription) throws InterruptedException, IOException {
-        JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy()
-            .setSourceTables(tempTables)
-            .setDestinationTable(ref)
-            .setWriteDisposition(writeDisposition.name())
-            .setCreateDisposition(createDisposition.name());
-
-        String projectId = ref.getProjectId();
-        Job lastFailedCopyJob = null;
-        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
-          String jobId = jobIdPrefix + "-" + i;
-          JobReference jobRef = new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId);
-          jobService.startCopyJob(jobRef, copyConfig);
-          Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
-          Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
-          switch (jobStatus) {
-            case SUCCEEDED:
-              if (tableDescription != null) {
-                datasetService.patchTableDescription(ref, tableDescription);
-              }
-              return;
-            case UNKNOWN:
-              throw new RuntimeException(String.format(
-                  "UNKNOWN status of copy job [%s]: %s.", jobId,
-                  BigQueryHelpers.jobToPrettyString(copyJob)));
-            case FAILED:
-              lastFailedCopyJob = copyJob;
-              continue;
-            default:
-              throw new IllegalStateException(String.format(
-                  "Unexpected status [%s] of load job: %s.",
-                  jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
-          }
-        }
-        throw new RuntimeException(String.format(
-            "Failed to create copy job with id prefix %s, "
-                + "reached max retries: %d, last failed copy job: %s.",
-            jobIdPrefix,
-            Write.MAX_RETRY_JOBS,
-            BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
-      }
-
-      static void removeTemporaryTables(DatasetService tableService,
-          List<TableReference> tempTables) {
-        for (TableReference tableRef : tempTables) {
-          try {
-            LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));
-            tableService.deleteTable(tableRef);
-          } catch (Exception e) {
-            LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e);
-          }
-        }
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
-                .withLabel("Table Reference"))
-            .add(DisplayData.item("writeDisposition", writeDisposition.toString())
-                .withLabel("Write Disposition"))
-            .add(DisplayData.item("createDisposition", createDisposition.toString())
-                .withLabel("Create Disposition"));
-      }
-    }
   }
 
   private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
@@ -1753,434 +1132,6 @@ public class BigQueryIO {
   static void clearCreatedTables() {
     StreamingWriteFn.clearCreatedTables();
   }
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Implementation of DoFn to perform streaming BigQuery write.
-   */
-  @VisibleForTesting
-  static class StreamingWriteFn
-      extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
-    /** TableSchema in JSON. Use String to make the class Serializable. */
-    @Nullable private final ValueProvider<String> jsonTableSchema;
-
-    @Nullable private final String tableDescription;
-
-    private final BigQueryServices bqServices;
-
-    /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
-    private transient Map<String, List<TableRow>> tableRows;
-
-    private final Write.CreateDisposition createDisposition;
-
-    /** The list of unique ids for each BigQuery table row. */
-    private transient Map<String, List<String>> uniqueIdsForTableRows;
-
-    /** The list of tables created so far, so we don't try the creation
-        each time. */
-    private static Set<String> createdTables =
-        Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-
-    /** Tracks bytes written, exposed as "ByteCount" Metrics Counter. */
-    private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount");
-
-    /** Constructor. */
-    StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
-                     Write.CreateDisposition createDisposition,
-                     @Nullable String tableDescription, BigQueryServices bqServices) {
-      this.jsonTableSchema = schema == null ? null :
-          NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
-      this.createDisposition = createDisposition;
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-      this.tableDescription = tableDescription;
-    }
-
-    /**
-     * Clear the cached map of created tables. Used for testing.
-     */
-    private static void clearCreatedTables() {
-      synchronized (createdTables) {
-        createdTables.clear();
-      }
-    }
-
-    /** Prepares a target BigQuery table. */
-    @StartBundle
-    public void startBundle(Context context) {
-      tableRows = new HashMap<>();
-      uniqueIdsForTableRows = new HashMap<>();
-    }
-
-    /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      String tableSpec = context.element().getKey().getKey();
-      List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
-      List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
-          tableSpec);
-
-      rows.add(context.element().getValue().tableRow);
-      uniqueIds.add(context.element().getValue().uniqueId);
-    }
-
-    /** Writes the accumulated rows into BigQuery with streaming API. */
-    @FinishBundle
-    public void finishBundle(Context context) throws Exception {
-      BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-
-      for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
-        TableReference tableReference = getOrCreateTable(options, entry.getKey());
-        flushRows(tableReference, entry.getValue(),
-            uniqueIdsForTableRows.get(entry.getKey()), options);
-      }
-      tableRows.clear();
-      uniqueIdsForTableRows.clear();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder
-          .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
-            .withLabel("Table Schema"))
-          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-            .withLabel("Table Description"));
-    }
-
-    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
-        throws InterruptedException, IOException {
-      TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
-      if (createDisposition != createDisposition.CREATE_NEVER
-          && !createdTables.contains(tableSpec)) {
-        synchronized (createdTables) {
-          // Another thread may have succeeded in creating the table in the meanwhile, so
-          // check again. This check isn't needed for correctness, but we add it to prevent
-          // every thread from attempting a create and overwhelming our BigQuery quota.
-          DatasetService datasetService = bqServices.getDatasetService(options);
-          if (!createdTables.contains(tableSpec)) {
-            if (datasetService.getTable(tableReference) == null) {
-              TableSchema tableSchema = JSON_FACTORY.fromString(
-                  jsonTableSchema.get(), TableSchema.class);
-              datasetService.createTable(
-                  new Table()
-                      .setTableReference(tableReference)
-                      .setSchema(tableSchema)
-                      .setDescription(tableDescription));
-            }
-            createdTables.add(tableSpec);
-          }
-        }
-      }
-      return tableReference;
-    }
-
-    /**
-     * Writes the accumulated rows into BigQuery with streaming API.
-     */
-    private void flushRows(TableReference tableReference,
-        List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
-            throws InterruptedException {
-      if (!tableRows.isEmpty()) {
-        try {
-          long totalBytes = bqServices.getDatasetService(options).insertAll(
-              tableReference, tableRows, uniqueIds);
-          byteCounter.inc(totalBytes);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  private static class ShardedKey<K> {
-    private final K key;
-    private final int shardNumber;
-
-    public static <K> ShardedKey<K> of(K key, int shardNumber) {
-      return new ShardedKey<>(key, shardNumber);
-    }
-
-    private ShardedKey(K key, int shardNumber) {
-      this.key = key;
-      this.shardNumber = shardNumber;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    public int getShardNumber() {
-      return shardNumber;
-    }
-  }
-
-  /**
-   * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
-   */
-  @VisibleForTesting
-  static class ShardedKeyCoder<KeyT>
-      extends StandardCoder<ShardedKey<KeyT>> {
-    public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
-      return new ShardedKeyCoder<>(keyCoder);
-    }
-
-    @JsonCreator
-    public static <KeyT> ShardedKeyCoder<KeyT> of(
-         @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<KeyT>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return of(components.get(0));
-    }
-
-    protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
-      this.keyCoder = keyCoder;
-      this.shardNumberCoder = VarIntCoder.of();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.asList(keyCoder);
-    }
-
-    @Override
-    public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
-        throws IOException {
-      keyCoder.encode(key.getKey(), outStream, context.nested());
-      shardNumberCoder.encode(key.getShardNumber(), outStream, context);
-    }
-
-    @Override
-    public ShardedKey<KeyT> decode(InputStream inStream, Context context)
-        throws IOException {
-      return new ShardedKey<>(
-          keyCoder.decode(inStream, context.nested()),
-          shardNumberCoder.decode(inStream, context));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      keyCoder.verifyDeterministic();
-    }
-
-    Coder<KeyT> keyCoder;
-    VarIntCoder shardNumberCoder;
-  }
-
-  @VisibleForTesting
-  static class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
-    private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
-
-    @JsonCreator
-    public static TableRowInfoCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(TableRowInfo value, OutputStream outStream, Context context)
-      throws IOException {
-      if (value == null) {
-        throw new CoderException("cannot encode a null value");
-      }
-      tableRowCoder.encode(value.tableRow, outStream, context.nested());
-      idCoder.encode(value.uniqueId, outStream, context);
-    }
-
-    @Override
-    public TableRowInfo decode(InputStream inStream, Context context)
-      throws IOException {
-      return new TableRowInfo(
-          tableRowCoder.decode(inStream, context.nested()),
-          idCoder.decode(inStream, context));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this, "TableRows are not deterministic.");
-    }
-
-    TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of();
-    StringUtf8Coder idCoder = StringUtf8Coder.of();
-  }
-
-  private static class TableRowInfo {
-    TableRowInfo(TableRow tableRow, String uniqueId) {
-      this.tableRow = tableRow;
-      this.uniqueId = uniqueId;
-    }
-
-    final TableRow tableRow;
-    final String uniqueId;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Fn that tags each table row with a unique id and destination table.
-   * To avoid calling UUID.randomUUID() for each element, which can be costly,
-   * a randomUUID is generated only once per bucket of data. The actual unique
-   * id is created by concatenating this randomUUID with a sequential number.
-   */
-  @VisibleForTesting
-  static class TagWithUniqueIdsAndTable<T>
-      extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
-    /** TableSpec to write to. */
-    private final ValueProvider<String> tableSpec;
-
-    /** User function mapping windowed values to {@link TableReference} in JSON. */
-    private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
-
-    /** User function mapping user type to a TableRow. */
-    private final SerializableFunction<T, TableRow> formatFunction;
-
-    private transient String randomUUID;
-    private transient long sequenceNo = 0L;
-
-    TagWithUniqueIdsAndTable(BigQueryOptions options,
-        ValueProvider<TableReference> table,
-        SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction,
-        SerializableFunction<T, TableRow> formatFunction) {
-      checkArgument(table == null ^ tableRefFunction == null,
-          "Exactly one of table or tableRefFunction should be set");
-      if (table != null) {
-        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
-          TableReference tableRef = table.get()
-              .setProjectId(options.as(BigQueryOptions.class).getProject());
-          table = NestedValueProvider.of(
-              StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
-              new JsonTableRefToTableRef());
-        }
-        this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
-      } else {
-        tableSpec = null;
-      }
-      this.tableRefFunction = tableRefFunction;
-      this.formatFunction = formatFunction;
-    }
-
-
-    @StartBundle
-    public void startBundle(Context context) {
-      randomUUID = UUID.randomUUID().toString();
-    }
-
-    /** Tag the input with a unique id. */
-    @ProcessElement
-    public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
-      String uniqueId = randomUUID + sequenceNo++;
-      ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
-      String tableSpec = tableSpecFromWindowedValue(
-          context.getPipelineOptions().as(BigQueryOptions.class),
-          ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
-      // We output on keys 0-50 to ensure that there's enough batching for
-      // BigQuery.
-      context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
-          new TableRowInfo(formatFunction.apply(context.element()), uniqueId)));
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder.addIfNotNull(DisplayData.item("table", tableSpec));
-      if (tableRefFunction != null) {
-        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
-          .withLabel("Table Reference Function"));
-      }
-    }
-
-    @VisibleForTesting
-    ValueProvider<String> getTableSpec() {
-      return tableSpec;
-    }
-
-    private String tableSpecFromWindowedValue(BigQueryOptions options,
-                                              ValueInSingleWindow<T> value) {
-      if (tableSpec != null) {
-        return tableSpec.get();
-      } else {
-        TableReference table = tableRefFunction.apply(value);
-        if (table.getProjectId() == null) {
-          table.setProjectId(options.getProject());
-        }
-        return BigQueryHelpers.toTableSpec(table);
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-  * PTransform that performs streaming BigQuery write. To increase consistency,
-  * it leverages BigQuery best effort de-dup mechanism.
-   */
-  private static class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
-    private final Write<T> write;
-
-    /** Constructor. */
-    StreamWithDeDup(Write<T> write) {
-      this.write = write;
-    }
-
-    @Override
-    protected Coder<Void> getDefaultOutputCoder() {
-      return VoidCoder.of();
-    }
-
-    @Override
-    public PDone expand(PCollection<T> input) {
-      // A naive implementation would be to simply stream data directly to BigQuery.
-      // However, this could occasionally lead to duplicated data, e.g., when
-      // a VM that runs this code is restarted and the code is re-run.
-
-      // The above risk is mitigated in this implementation by relying on
-      // BigQuery built-in best effort de-dup mechanism.
-
-      // To use this mechanism, each input TableRow is tagged with a generated
-      // unique id, which is then passed to BigQuery and used to ignore duplicates.
-
-      PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
-          input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
-              input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
-              write.getTableRefFunction(), write.getFormatFunction())));
-
-      // To prevent having the same TableRow processed more than once with regenerated
-      // different unique ids, this implementation relies on "checkpointing", which is
-      // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
-      // performed by Reshuffle.
-      NestedValueProvider<TableSchema, String> schema =
-          write.getJsonSchema() == null
-              ? null
-              : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
-      tagged
-          .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
-          .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(
-              ParDo.of(
-                  new StreamingWriteFn(
-                      schema,
-                      write.getCreateDisposition(),
-                      write.getTableDescription(),
-                      write.getBigQueryServices())));
-
-      // Note that the implementation to return PDone here breaks the
-      // implicit assumption about the job execution order. If a user
-      // implements a PTransform that takes PDone returned here as its
-      // input, the transform may not necessarily be executed after
-      // the BigQueryIO.Write.
-
-      return PDone.in(input.getPipeline());
-    }
-  }
-
-  /**
-   * Status of a BigQuery job or request.
-   */
-  enum Status {
-    SUCCEEDED,
-    FAILED,
-    UNKNOWN,
-  }
 
   /////////////////////////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index a909957..9153157 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -14,9 +32,10 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index ff50e6d..746258f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -18,7 +36,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index aae0faa..cbd5781 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
index 612afbe..75f7b93 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
new file mode 100644
index 0000000..8c968df
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/**
+ * A key and a shard number.
+ */
+class ShardedKey<K> {
+  private final K key;
+  private final int shardNumber;
+
+  public static <K> ShardedKey<K> of(K key, int shardNumber) {
+    return new ShardedKey<>(key, shardNumber);
+  }
+
+  ShardedKey(K key, int shardNumber) {
+    this.key = key;
+    this.shardNumber = shardNumber;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public int getShardNumber() {
+    return shardNumber;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
new file mode 100644
index 0000000..be4e71c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKeyCoder.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.util.PropertyNames;
+
+
+/**
+ * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
+ */
+@VisibleForTesting
+class ShardedKeyCoder<KeyT>
+    extends StandardCoder<ShardedKey<KeyT>> {
+  public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+    return new ShardedKeyCoder<>(keyCoder);
+  }
+
+  @JsonCreator
+  public static <KeyT> ShardedKeyCoder<KeyT> of(
+       @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+           List<Coder<KeyT>> components) {
+    checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+    return of(components.get(0));
+  }
+
+  protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+    this.keyCoder = keyCoder;
+    this.shardNumberCoder = VarIntCoder.of();
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(keyCoder);
+  }
+
+  @Override
+  public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
+      throws IOException {
+    keyCoder.encode(key.getKey(), outStream, context.nested());
+    shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+  }
+
+  @Override
+  public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+      throws IOException {
+    return new ShardedKey<>(
+        keyCoder.decode(inStream, context.nested()),
+        shardNumberCoder.decode(inStream, context));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    keyCoder.verifyDeterministic();
+  }
+
+  Coder<KeyT> keyCoder;
+  VarIntCoder shardNumberCoder;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d6ef0104/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
new file mode 100644
index 0000000..f667295
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableSchema;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+/**
+* PTransform that performs streaming BigQuery write. To increase consistency,
+* it leverages BigQuery best effort de-dup mechanism.
+ */
+class StreamWithDeDup<T> extends PTransform<PCollection<T>, PDone> {
+  private final Write<T> write;
+
+  /** Constructor. */
+  StreamWithDeDup(Write<T> write) {
+    this.write = write;
+  }
+
+  @Override
+  protected Coder<Void> getDefaultOutputCoder() {
+    return VoidCoder.of();
+  }
+
+  @Override
+  public PDone expand(PCollection<T> input) {
+    // A naive implementation would be to simply stream data directly to BigQuery.
+    // However, this could occasionally lead to duplicated data, e.g., when
+    // a VM that runs this code is restarted and the code is re-run.
+
+    // The above risk is mitigated in this implementation by relying on
+    // BigQuery built-in best effort de-dup mechanism.
+
+    // To use this mechanism, each input TableRow is tagged with a generated
+    // unique id, which is then passed to BigQuery and used to ignore duplicates.
+
+    PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
+        input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
+            input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
+            write.getTableRefFunction(), write.getFormatFunction())));
+
+    // To prevent having the same TableRow processed more than once with regenerated
+    // different unique ids, this implementation relies on "checkpointing", which is
+    // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+    // performed by Reshuffle.
+    NestedValueProvider<TableSchema, String> schema =
+        write.getJsonSchema() == null
+            ? null
+            : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
+    tagged
+        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+        .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+        .apply(
+            ParDo.of(
+                new StreamingWriteFn(
+                    schema,
+                    write.getCreateDisposition(),
+                    write.getTableDescription(),
+                    write.getBigQueryServices())));
+
+    // Note that the implementation to return PDone here breaks the
+    // implicit assumption about the job execution order. If a user
+    // implements a PTransform that takes PDone returned here as its
+    // input, the transform may not necessarily be executed after
+    // the BigQueryIO.Write.
+
+    return PDone.in(input.getPipeline());
+  }
+}