You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 05:11:02 UTC

[03/10] beam git commit: Separate streaming writes into two pluggable components - CreateTables, and StreamingWriteTables. Also address many code review comments. Also merge with master.

Separate streaming writes into two pluggable components - CreateTables, and StreamingWriteTables.
Also address many code review comments.
Also merge with master.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d13061c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d13061c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d13061c

Branch: refs/heads/master
Commit: 7d13061cc36466c502bbc1f61d391743dd3739af
Parents: b486137
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 2 21:39:50 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 176 ++++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    |  13 ++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  21 ++-
 .../io/gcp/bigquery/BigQueryTableSource.java    |   4 +-
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  |  95 ++++++----
 .../io/gcp/bigquery/GenerateShardedTable.java   |   3 +-
 .../beam/sdk/io/gcp/bigquery/PrepareWrite.java  |  80 +++++----
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |   1 +
 .../sdk/io/gcp/bigquery/StreamingInserts.java   |  44 +----
 .../io/gcp/bigquery/StreamingWriteTables.java   |  86 +++++++++
 .../sdk/io/gcp/bigquery/TableDestination.java   |   1 +
 .../io/gcp/bigquery/TableDestinationCoder.java  |  62 +++----
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |  14 +-
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |  14 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  25 +--
 .../sdk/io/gcp/bigquery/WritePartition.java     | 127 ++++++++-----
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |   5 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  17 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  66 ++++---
 19 files changed, 516 insertions(+), 338 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 06fdfce..236b234 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import java.io.IOException;
@@ -35,7 +34,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -54,17 +53,13 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-
-/**
- * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
- */
+/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
 class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
   BigQueryIO.Write<?> write;
 
-  private static class ConstantSchemaFunction implements
-      SerializableFunction<TableDestination, TableSchema> {
-    private final @Nullable
-    ValueProvider<String> jsonSchema;
+  private static class ConstantSchemaFunction
+      implements SerializableFunction<TableDestination, TableSchema> {
+    private final @Nullable ValueProvider<String> jsonSchema;
 
     ConstantSchemaFunction(ValueProvider<String> jsonSchema) {
       this.jsonSchema = jsonSchema;
@@ -86,7 +81,6 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
   public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
     Pipeline p = input.getPipeline();
     BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-    ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
 
     final String stepUuid = BigQueryHelpers.randomUUIDString();
 
@@ -94,40 +88,41 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
     String tempFilePrefix;
     try {
       IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-      tempFilePrefix = factory.resolve(
-          factory.resolve(tempLocation, "BigQueryWriteTemp"),
-          stepUuid);
+      tempFilePrefix =
+          factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), stepUuid);
     } catch (IOException e) {
       throw new RuntimeException(
-          String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-          e);
+          String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
     }
 
     // Create a singleton job ID token at execution time. This will be used as the base for all
     // load jobs issued from this instance of the transfomr.
     PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
-    PCollectionView<String> jobIdTokenView = p
-        .apply("TriggerIdCreation", Create.of("ignored"))
-        .apply("CreateJobId", MapElements.via(
-            new SimpleFunction<String, String>() {
-              @Override
-              public String apply(String input) {
-                return stepUuid;
-              }
-            }))
-        .apply(View.<String>asSingleton());
+    PCollectionView<String> jobIdTokenView =
+        p.apply("TriggerIdCreation", Create.of("ignored"))
+            .apply(
+                "CreateJobId",
+                MapElements.via(
+                    new SimpleFunction<String, String>() {
+                      @Override
+                      public String apply(String input) {
+                        return stepUuid;
+                      }
+                    }))
+            .apply(View.<String>asSingleton());
 
     PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
-        input.apply("rewindowIntoGlobal",
+        input.apply(
+            "rewindowIntoGlobal",
             Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
                 .triggering(DefaultTrigger.of())
                 .discardingFiredPanes());
 
     // PCollection of filename, file byte size, and table destination.
-    PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow
-        .apply("WriteBundlesToFiles",
-            ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
-        .setCoder(WriteBundlesToFiles.ResultCoder.of());
+    PCollection<WriteBundlesToFiles.Result> results =
+        inputInGlobalWindow
+            .apply("WriteBundlesToFiles", ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
+            .setCoder(WriteBundlesToFiles.ResultCoder.of());
 
     TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
         new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
@@ -136,20 +131,23 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
 
     // Turn the list of files and record counts in a PCollectionView that can be used as a
     // side input.
-    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results
-        .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
+    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
+        results.apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
     // This transform will look at the set of files written for each table, and if any table has
     // too many files or bytes, will partition that table's files into multiple partitions for
     // loading.
-    PCollectionTuple partitions = singleton.apply("WritePartition",
-        ParDo.of(new WritePartition(
-            write.getJsonTableRef(),
-            write.getTableDescription(),
-            resultsView,
-            multiPartitionsTag,
-            singlePartitionTag))
-        .withSideInputs(resultsView)
-        .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+    PCollectionTuple partitions =
+        singleton.apply(
+            "WritePartition",
+            ParDo.of(
+                    new WritePartition(
+                        write.getJsonTableRef(),
+                        write.getTableDescription(),
+                        resultsView,
+                        multiPartitionsTag,
+                        singlePartitionTag))
+                .withSideInputs(resultsView)
+                .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
 
     // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
     // schema function here. If no schema is specified, this function will return null.
@@ -158,55 +156,69 @@ class BatchLoads extends PTransform<PCollection<KV<TableDestination, TableRow>>,
         new ConstantSchemaFunction(write.getJsonSchema());
 
     Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder =
-        KvCoder.of(ShardedKeyCoder.of(TableDestinationCoder.of()),
-            ListCoder.of(StringUtf8Coder.of()));
+        KvCoder.of(
+            ShardedKeyCoder.of(TableDestinationCoder.of()), ListCoder.of(StringUtf8Coder.of()));
     // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
     // the import needs to be split into multiple partitions, and those partitions will be
     // specified in multiPartitionsTag.
-    PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag)
-        .setCoder(partitionsCoder)
-        // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe
-        // Reshuffle is better here.
-        .apply("MultiPartitionsGroupByKey",
-            GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
-        .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
-            false,
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            tempFilePrefix,
-            WriteDisposition.WRITE_EMPTY,
-            CreateDisposition.CREATE_IF_NEEDED,
-            schemaFunction))
-            .withSideInputs(jobIdTokenView));
+    PCollection<KV<TableDestination, String>> tempTables =
+        partitions
+            .get(multiPartitionsTag)
+            .setCoder(partitionsCoder)
+            // Reshuffle will distribute this among multiple workers, and also guard against
+            // reexecution of the WritePartitions step once WriteTables has begun.
+            .apply(
+                "MultiPartitionsReshuffle",
+                Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+            .apply(
+                "MultiPartitionsWriteTables",
+                ParDo.of(
+                        new WriteTables(
+                            false,
+                            write.getBigQueryServices(),
+                            jobIdTokenView,
+                            tempFilePrefix,
+                            WriteDisposition.WRITE_EMPTY,
+                            CreateDisposition.CREATE_IF_NEEDED,
+                            schemaFunction))
+                    .withSideInputs(jobIdTokenView));
 
     // This view maps each final table destination to the set of temporary partitioned tables
     // the PCollection was loaded into.
-    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables
-        .apply("TempTablesView", View.<TableDestination, String>asMultimap());
-
-    singleton.apply("WriteRename", ParDo
-        .of(new WriteRename(
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            write.getWriteDisposition(),
-            write.getCreateDisposition(),
-            tempTablesView))
-        .withSideInputs(tempTablesView, jobIdTokenView));
+    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
+        tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap());
+
+    singleton.apply(
+        "WriteRename",
+        ParDo.of(
+                new WriteRename(
+                    write.getBigQueryServices(),
+                    jobIdTokenView,
+                    write.getWriteDisposition(),
+                    write.getCreateDisposition(),
+                    tempTablesView))
+            .withSideInputs(tempTablesView, jobIdTokenView));
 
     // Write single partition to final table
-    partitions.get(singlePartitionTag)
+    partitions
+        .get(singlePartitionTag)
         .setCoder(partitionsCoder)
-        .apply("SinglePartitionGroupByKey",
-            GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
-        .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
-            true,
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            tempFilePrefix,
-            write.getWriteDisposition(),
-            write.getCreateDisposition(),
-            schemaFunction))
-            .withSideInputs(jobIdTokenView));
+        // Reshuffle will distribute this among multiple workers, and also guard against
+        // reexecution of the WritePartitions step once WriteTables has begun.
+        .apply(
+            "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<TableDestination>, List<String>>of())
+        .apply(
+            "SinglePartitionWriteTables",
+            ParDo.of(
+                    new WriteTables(
+                        true,
+                        write.getBigQueryServices(),
+                        jobIdTokenView,
+                        tempFilePrefix,
+                        write.getWriteDisposition(),
+                        write.getCreateDisposition(),
+                        schemaFunction))
+                .withSideInputs(jobIdTokenView));
 
     return WriteResult.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 846103d..e04361c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.hash.Hashing;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -234,6 +235,18 @@ public class BigQueryHelpers {
     }
   }
 
+  // Create a unique job id for a table load.
+  static String createJobId(String prefix, TableDestination tableDestination, int partition) {
+    // Job ID must be different for each partition of each table.
+    String destinationHash =
+        Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
+    if (partition >= 0) {
+      return String.format("%s_%s_%05d", prefix, destinationHash, partition);
+    } else {
+      return String.format("%s_%s", prefix, destinationHash);
+    }
+  }
+
   @VisibleForTesting
   static class JsonSchemaToTableSchema
       implements SerializableFunction<String, TableSchema> {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 54a25c7..3f5947e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -61,7 +61,6 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelFactory;
@@ -445,7 +444,8 @@ public class BigQueryIO {
       // Note that a table or query check can fail if the table or dataset are created by
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
-      if (getValidate() && table != null) {
+      if (getValidate() && table != null && table.isAccessible() && table.get().getProjectId()
+          != null) {
         checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
         // Check for source table presence for early failure notification.
         DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
@@ -650,6 +650,7 @@ public class BigQueryIO {
   public static <T> Write<T> write() {
     return new AutoValue_BigQueryIO_Write.Builder<T>()
         .setValidate(true)
+        .setTableDescription("")
         .setBigQueryServices(new BigQueryServicesImpl())
         .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
         .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
@@ -690,7 +691,8 @@ public class BigQueryIO {
     @Nullable abstract ValueProvider<String> getJsonSchema();
     abstract CreateDisposition getCreateDisposition();
     abstract WriteDisposition getWriteDisposition();
-    @Nullable abstract String getTableDescription();
+    /** Table description. Default is empty. */
+    abstract String getTableDescription();
     /** An option to indicate if table validation is desired. Default is true. */
     abstract boolean getValidate();
     abstract BigQueryServices getBigQueryServices();
@@ -805,9 +807,6 @@ public class BigQueryIO {
     public Write<T> to(ValueProvider<String> tableSpec) {
       ensureToNotCalledYet();
       String tableDescription = getTableDescription();
-      if (tableDescription == null) {
-        tableDescription = "";
-      }
       return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
@@ -911,7 +910,7 @@ public class BigQueryIO {
     public void validate(PCollection<T> input) {
       BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
-      // Exactly one of the table and table reference can be configured.
+      // We must have a destination to write to!
       checkState(getTableFunction() != null,
           "must set the table reference of a BigQueryIO.Write transform");
 
@@ -972,8 +971,8 @@ public class BigQueryIO {
     @Override
     public WriteResult expand(PCollection<T> input) {
       PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
-          input.apply("PrepareWrite", ParDo.of(
-              new PrepareWrite<T>(getTableFunction(), getFormatFunction())))
+          input.apply("PrepareWrite", new PrepareWrite<T>(
+              getTableFunction(), getFormatFunction()))
               .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of()));
 
 
@@ -1013,8 +1012,8 @@ public class BigQueryIO {
             .withLabel("Table WriteDisposition"))
           .addIfNotDefault(DisplayData.item("validation", getValidate())
             .withLabel("Validation Enabled"), true)
-          .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
-            .withLabel("Table Description"));
+          .addIfNotDefault(DisplayData.item("tableDescription", getTableDescription())
+            .withLabel("Table Description"), "");
     }
 
     /** Returns the table schema. */

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index 22aba64..a28da92 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -109,8 +109,8 @@ class BigQueryTableSource extends BigQuerySourceBase {
   @Override
   public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
     if (tableSizeBytes.get() == null) {
-      TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
-          TableReference.class);
+      TableReference table = setDefaultProjectIfAbsent(options.as(BigQueryOptions.class),
+          BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
 
       Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
           .getTable(table).getNumBytes();

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
index e216553..a78f32d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -1,68 +1,94 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
-
+import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Creates any tables needed before performing streaming writes to the tables. This is a
- * side-effect {l@ink DoFn}, and returns the original collection unchanged.
+ * Creates any tables needed before performing streaming writes to the tables. This is a side-effect
+ * {@link DoFn}, and returns the original collection unchanged.
  */
-public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
-    KV<TableDestination, TableRow>> {
+public class CreateTables
+    extends PTransform<
+        PCollection<KV<TableDestination, TableRow>>, PCollection<KV<TableDestination, TableRow>>> {
   private final CreateDisposition createDisposition;
   private final BigQueryServices bqServices;
   private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
 
-
-  /** The list of tables created so far, so we don't try the creation
-   each time.
-   * TODO: We should put a bound on memory usage of this. Use guava cache instead.
+  /**
+   * The list of tables created so far, so we don't try the creation each time.
+   *
+   * <p>TODO: We should put a bound on memory usage of this. Use guava cache instead.
    */
   private static Set<String> createdTables =
       Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
-  public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices,
-                      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+  public CreateTables(
+      CreateDisposition createDisposition,
+      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+    this(createDisposition, new BigQueryServicesImpl(), schemaFunction);
+  }
+
+  private CreateTables(
+      CreateDisposition createDisposition,
+      BigQueryServices bqServices,
+      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
     this.createDisposition = createDisposition;
     this.bqServices = bqServices;
     this.schemaFunction = schemaFunction;
   }
 
-  @ProcessElement
-  public void processElement(ProcessContext context) throws InterruptedException, IOException {
-    BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-    possibleCreateTable(options, context.element().getKey());
-    context.output(context.element());
+  CreateTables withTestServices(BigQueryServices bqServices) {
+    return new CreateTables(createDisposition, bqServices, schemaFunction);
+  }
+
+  @Override
+  public PCollection<KV<TableDestination, TableRow>> expand(
+      PCollection<KV<TableDestination, TableRow>> input) {
+    return input.apply(
+        ParDo.of(
+            new DoFn<KV<TableDestination, TableRow>, KV<TableDestination, TableRow>>() {
+              @ProcessElement
+              public void processElement(ProcessContext context)
+                  throws InterruptedException, IOException {
+                BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+                possibleCreateTable(options, context.element().getKey());
+                context.output(context.element());
+              }
+            }));
   }
 
   private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination)
@@ -70,8 +96,7 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
     String tableSpec = tableDestination.getTableSpec();
     TableReference tableReference = tableDestination.getTableReference();
     String tableDescription = tableDestination.getTableDescription();
-    if (createDisposition != createDisposition.CREATE_NEVER
-        && !createdTables.contains(tableSpec)) {
+    if (createDisposition != createDisposition.CREATE_NEVER && !createdTables.contains(tableSpec)) {
       synchronized (createdTables) {
         // Another thread may have succeeded in creating the table in the meanwhile, so
         // check again. This check isn't needed for correctness, but we add it to prevent
@@ -92,6 +117,8 @@ public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
     }
   }
 
+  /** This method is used by the testing fake to clear static state. */
+  @VisibleForTesting
   static void clearCreatedTables() {
     synchronized (createdTables) {
       createdTables.clear();

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
index da3a70a..90d41a0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -39,8 +39,7 @@ class GenerateShardedTable extends DoFn<KV<TableDestination, TableRow>,
   @ProcessElement
   public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
     ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
-    // We output on keys 0-50 to ensure that there's enough batching for
-    // BigQuery.
+    // We output on keys 0-numShards.
     String tableSpec = context.element().getKey().getTableSpec();
     context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)),
         context.element().getValue()));

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index 7712417..a8bdb43 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
@@ -23,6 +23,8 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
@@ -30,37 +32,49 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
- * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference
- * function to determine which tables each element is written to, and format the element into a
- * {@link TableRow} using the user-supplied format function.
+ * Prepare an input {@link PCollection} for writing to BigQuery. Use the table function to determine
+ * which tables each element is written to, and format the element into a {@link TableRow} using the
+ * user-supplied format function.
  */
-public class PrepareWrite<T> extends DoFn<T, KV<TableDestination, TableRow>> {
+public class PrepareWrite<T>
+    extends PTransform<PCollection<T>, PCollection<KV<TableDestination, TableRow>>> {
   private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
   private SerializableFunction<T, TableRow> formatFunction;
 
-  public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
-                      SerializableFunction<T, TableRow> formatFunction) {
+  public PrepareWrite(
+      SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
+      SerializableFunction<T, TableRow> formatFunction) {
     this.tableFunction = tableFunction;
     this.formatFunction = formatFunction;
   }
 
-  @ProcessElement
-  public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
-    TableDestination tableDestination = tableSpecFromWindowedValue(
-        context.getPipelineOptions().as(BigQueryOptions.class),
-        ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
-    TableRow tableRow = formatFunction.apply(context.element());
-    context.output(KV.of(tableDestination, tableRow));
+  @Override
+  public PCollection<KV<TableDestination, TableRow>> expand(PCollection<T> input) {
+    return input.apply(
+        ParDo.of(
+            new DoFn<T, KV<TableDestination, TableRow>>() {
+              @ProcessElement
+              public void processElement(ProcessContext context, BoundedWindow window)
+                  throws IOException {
+                TableDestination tableDestination =
+                    tableSpecFromWindowedValue(
+                        context.getPipelineOptions().as(BigQueryOptions.class),
+                        ValueInSingleWindow.of(
+                            context.element(), context.timestamp(), window, context.pane()));
+                TableRow tableRow = formatFunction.apply(context.element());
+                context.output(KV.of(tableDestination, tableRow));
+              }
+            }));
   }
 
-  private TableDestination tableSpecFromWindowedValue(BigQueryOptions options,
-                                            ValueInSingleWindow<T> value) {
+  private TableDestination tableSpecFromWindowedValue(
+      BigQueryOptions options, ValueInSingleWindow<T> value) {
     TableDestination tableDestination = tableFunction.apply(value);
     TableReference tableReference = tableDestination.getTableReference();
     if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
       tableReference.setProjectId(options.getProject());
-      tableDestination = new TableDestination(tableReference,
-          tableDestination.getTableDescription());
+      tableDestination =
+          new TableDestination(tableReference, tableDestination.getTableDescription());
     }
     return tableDestination;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
index 09b4fbf..c2b739f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -25,6 +25,7 @@ import java.util.Objects;
  * A key and a shard number.
  */
 class ShardedKey<K> implements Serializable {
+  private static final long serialVersionUID = 1L;
   private final K key;
   private final int shardNumber;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
index ced1d66..efd9c31 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -22,15 +22,10 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -38,8 +33,8 @@ import org.apache.beam.sdk.values.PCollection;
 * PTransform that performs streaming BigQuery write. To increase consistency,
 * it leverages BigQuery best effort de-dup mechanism.
  */
-class StreamingInserts extends PTransform<PCollection<KV<TableDestination, TableRow>>,
-    WriteResult> {
+public class StreamingInserts extends
+    PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
   private final Write<?> write;
 
   private static class ConstantSchemaFunction implements
@@ -74,36 +69,11 @@ class StreamingInserts extends PTransform<PCollection<KV<TableDestination, Table
     SerializableFunction<TableDestination, TableSchema> schemaFunction =
         new ConstantSchemaFunction(write.getSchema());
 
-    // A naive implementation would be to simply stream data directly to BigQuery.
-    // However, this could occasionally lead to duplicated data, e.g., when
-    // a VM that runs this code is restarted and the code is re-run.
+    PCollection<KV<TableDestination, TableRow>> writes = input
+        .apply("CreateTables", new CreateTables(write.getCreateDisposition(), schemaFunction)
+                .withTestServices(write.getBigQueryServices()));
 
-    // The above risk is mitigated in this implementation by relying on
-    // BigQuery built-in best effort de-dup mechanism.
-
-    // To use this mechanism, each input TableRow is tagged with a generated
-    // unique id, which is then passed to BigQuery and used to ignore duplicates.
-    PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input
-        .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(),
-            write.getBigQueryServices(), schemaFunction)))
-        // We create 50 keys per BigQuery table to generate output on. This is few enough that we
-        // get good batching into BigQuery's insert calls, and enough that we can max out the
-        // streaming insert quota.
-        .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50)))
-        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
-        .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
-
-    // To prevent having the same TableRow processed more than once with regenerated
-    // different unique ids, this implementation relies on "checkpointing", which is
-    // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
-    // performed by Reshuffle.
-    tagged
-        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
-        .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-        .apply("StreamingWrite",
-            ParDo.of(
-                new StreamingWriteFn(write.getBigQueryServices())));
-
-    return WriteResult.in(input.getPipeline());
+    return writes.apply(new StreamingWriteTables()
+        .withTestServices(write.getBigQueryServices()));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
new file mode 100644
index 0000000..4ddc1df
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * This transform takes in key-value pairs of {@link TableRow} entries and the
+ * {@link TableDestination} it should be written to. The BigQuery streaming-write service is used
+ * to stream these writes to the appropriate table.
+ *
+ * <p>This transform assumes that all destination tables already exist by the time it sees a write
+ * for that table.
+ */
+public class StreamingWriteTables extends PTransform<
+    PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+  private BigQueryServices bigQueryServices;
+
+  public StreamingWriteTables() {
+    this(new BigQueryServicesImpl());
+  }
+
+  private StreamingWriteTables(BigQueryServices bigQueryServices) {
+    this.bigQueryServices = bigQueryServices;
+  }
+
+  StreamingWriteTables withTestServices(BigQueryServices bigQueryServices) {
+    return new StreamingWriteTables(bigQueryServices);
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+    // A naive implementation would be to simply stream data directly to BigQuery.
+    // However, this could occasionally lead to duplicated data, e.g., when
+    // a VM that runs this code is restarted and the code is re-run.
+
+    // The above risk is mitigated in this implementation by relying on
+    // BigQuery built-in best effort de-dup mechanism.
+
+    // To use this mechanism, each input TableRow is tagged with a generated
+    // unique id, which is then passed to BigQuery and used to ignore duplicates
+    // We create 50 keys per BigQuery table to generate output on. This is few enough that we
+    // get good batching into BigQuery's insert calls, and enough that we can max out the
+    // streaming insert quota.
+    PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
+        input.apply("ShardTableWrites", ParDo.of
+        (new GenerateShardedTable(50)))
+        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
+        .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
+
+    // To prevent having the same TableRow processed more than once with regenerated
+    // different unique ids, this implementation relies on "checkpointing", which is
+    // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+    // performed by Reshuffle.
+    tagged
+        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+        .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+        .apply("StreamingWrite",
+            ParDo.of(
+                new StreamingWriteFn(bigQueryServices)));
+    return WriteResult.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 36e1401..962e2cd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -27,6 +27,7 @@ import java.util.Objects;
  * Encapsulates a BigQuery table destination.
  */
 public class TableDestination implements Serializable {
+  private static final long serialVersionUID = 1L;
   private final String tableSpec;
   private final String tableDescription;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
index fa24700..262a00d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -1,20 +1,20 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
@@ -26,20 +26,18 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
-/**
- * A coder for {@link TableDestination} objects.
- */
+/** A coder for {@link TableDestination} objects. */
 public class TableDestinationCoder extends AtomicCoder<TableDestination> {
   private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
-
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
 
   @JsonCreator
   public static TableDestinationCoder of() {
-      return INSTANCE;
-    }
+    return INSTANCE;
+  }
 
   @Override
-    public void encode(TableDestination value, OutputStream outStream, Context context)
+  public void encode(TableDestination value, OutputStream outStream, Context context)
       throws IOException {
     if (value == null) {
       throw new CoderException("cannot encode a null value");
@@ -50,15 +48,13 @@ public class TableDestinationCoder extends AtomicCoder<TableDestination> {
 
   @Override
   public TableDestination decode(InputStream inStream, Context context) throws IOException {
-      return new TableDestination(
-          stringCoder.decode(inStream, context.nested()),
-          stringCoder.decode(inStream, context.nested()));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      return;
-    }
+    return new TableDestination(
+        stringCoder.decode(inStream, context.nested()),
+        stringCoder.decode(inStream, context.nested()));
+  }
 
-    StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    return;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index ee8f466..91ef404 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -32,9 +32,7 @@ import org.apache.beam.sdk.util.MimeTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery.
- */
+/** Writes {@TableRow} objects out to a file. Used when doing batch load jobs into BigQuery. */
 class TableRowWriter {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
 
@@ -47,16 +45,18 @@ class TableRowWriter {
   protected String mimeType = MimeTypes.TEXT;
   private CountingOutputStream out;
 
-  public class Result {
-    String filename;
-    long byteSize;
+  public static final class Result {
+    final String filename;
+    final long byteSize;
+
     public Result(String filename, long byteSize) {
       this.filename = filename;
       this.byteSize = byteSize;
     }
   }
+
   TableRowWriter(String basename) {
-      this.tempFilePrefix = basename;
+    this.tempFilePrefix = basename;
   }
 
   public final void open(String uId) throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
index 7379784..284691e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -28,15 +28,14 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 
 /**
- * Fn that tags each table row with a unique id and destination table.
- * To avoid calling UUID.randomUUID() for each element, which can be costly,
- * a randomUUID is generated only once per bucket of data. The actual unique
- * id is created by concatenating this randomUUID with a sequential number.
+ * Fn that tags each table row with a unique id and destination table. To avoid calling
+ * UUID.randomUUID() for each element, which can be costly, a randomUUID is generated only once per
+ * bucket of data. The actual unique id is created by concatenating this randomUUID with a
+ * sequential number.
  */
 @VisibleForTesting
 class TagWithUniqueIds
     extends DoFn<KV<ShardedKey<String>, TableRow>, KV<ShardedKey<String>, TableRowInfo>> {
-
   private transient String randomUUID;
   private transient long sequenceNo = 0L;
 
@@ -51,8 +50,9 @@ class TagWithUniqueIds
     String uniqueId = randomUUID + sequenceNo++;
     // We output on keys 0-50 to ensure that there's enough batching for
     // BigQuery.
-    context.output(KV.of(context.element().getKey(),
-        new TableRowInfo(context.element().getValue(), uniqueId)));
+    context.output(
+        KV.of(
+            context.element().getKey(), new TableRowInfo(context.element().getValue(), uniqueId)));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 869e68a..a25cc90 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -51,10 +51,11 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
    * and encapsulates the table it is destined to as well as the file byte size.
    */
-  public static class Result implements Serializable {
-    public String filename;
-    public Long fileByteSize;
-    public TableDestination tableDestination;
+  public static final class Result implements Serializable {
+    private static final long serialVersionUID = 1L;
+    public final String filename;
+    public final Long fileByteSize;
+    public final TableDestination tableDestination;
 
     public Result(String filename, Long fileByteSize, TableDestination tableDestination) {
       this.filename = filename;
@@ -68,6 +69,9 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
    */
   public static class ResultCoder extends AtomicCoder<Result> {
     private static final ResultCoder INSTANCE = new ResultCoder();
+    private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+    private static final VarLongCoder longCoder = VarLongCoder.of();
+    private static final TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of();
 
     public static ResultCoder of() {
       return INSTANCE;
@@ -87,18 +91,15 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
     @Override
     public Result decode(InputStream inStream, Context context)
         throws IOException {
-      return new Result(stringCoder.decode(inStream, context.nested()),
-          longCoder.decode(inStream, context.nested()),
-          tableDestinationCoder.decode(inStream, context.nested()));
+      String filename = stringCoder.decode(inStream, context.nested());
+      long fileByteSize = longCoder.decode(inStream, context.nested());
+      TableDestination tableDestination = tableDestinationCoder.decode(inStream, context.nested());
+      return new Result(filename, fileByteSize, tableDestination);
     }
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
     }
-
-    StringUtf8Coder stringCoder = StringUtf8Coder.of();
-    VarLongCoder longCoder = VarLongCoder.of();
-    TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of();
   }
 
   WriteBundlesToFiles(String tempFilePrefix) {
@@ -107,6 +108,8 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
 
   @StartBundle
   public void startBundle(Context c) {
+    // This must be done each bundle, as by default the {@link DoFn} might be reused between
+    // bundles.
     this.writers = Maps.newHashMap();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 9c48b82..9414909 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -44,7 +44,65 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
   private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag;
   private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag;
 
-  public WritePartition(
+  private static class PartitionData {
+    private int numFiles = 0;
+    private long byteSize = 0;
+    private List<String> filenames = Lists.newArrayList();
+
+    int getNumFiles() {
+      return numFiles;
+    }
+
+    void addFiles(int numFiles) {
+      this.numFiles += numFiles;
+    }
+
+    long getByteSize() {
+      return byteSize;
+    }
+
+    void addBytes(long numBytes) {
+      this.byteSize += numBytes;
+    }
+
+    List<String> getFilenames() {
+      return  filenames;
+    }
+
+    void addFilename(String filename) {
+      filenames.add(filename);
+    }
+
+    // Check to see whether we can add to this partition without exceeding the maximum partition
+    // size.
+    boolean canAccept(int numFiles, long numBytes) {
+      return this.numFiles + numFiles <= Write.MAX_NUM_FILES
+          && this.byteSize + numBytes <= Write.MAX_SIZE_BYTES;
+    }
+  }
+
+  private static class DestinationData {
+    private List<PartitionData> partitions = Lists.newArrayList();
+
+    DestinationData() {
+      // Always start out with a single empty partition.
+      partitions.add(new PartitionData());
+    }
+
+    List<PartitionData> getPartitions() {
+      return partitions;
+    }
+
+    PartitionData getLatestPartition() {
+      return partitions.get(partitions.size() - 1);
+    }
+
+    void addPartition(PartitionData partition) {
+       partitions.add(partition);
+    }
+  }
+
+  WritePartition(
       ValueProvider<String> singletonOutputJsonTableRef,
       String singletonOutputTableDescription,
       PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
@@ -76,54 +134,41 @@ class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<
     }
 
 
-    long partitionId = 0;
-    Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap();
-    Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap();
-    Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap();
-    for (int i = 0; i < results.size(); ++i) {
-      WriteBundlesToFiles.Result fileResult = results.get(i);
+    Map<TableDestination, DestinationData> currentResults = Maps.newHashMap();
+    for (WriteBundlesToFiles.Result fileResult : results) {
       TableDestination tableDestination = fileResult.tableDestination;
-      List<List<String>> partitions = currResultsMap.get(tableDestination);
-      if (partitions == null) {
-        partitions = Lists.newArrayList();
-        partitions.add(Lists.<String>newArrayList());
-        currResultsMap.put(tableDestination, partitions);
+      DestinationData destinationData = currentResults.get(tableDestination);
+      if (destinationData == null) {
+        destinationData = new DestinationData();
+        currentResults.put(tableDestination, destinationData);
       }
-      int currNumFiles = getOrDefault(currNumFilesMap, tableDestination, 0);
-      long currSizeBytes = getOrDefault(currSizeBytesMap, tableDestination, 0L);
-      if (currNumFiles + 1 > Write.MAX_NUM_FILES
-          || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
-        // Add a new partition for this table.
-        partitions.add(Lists.<String>newArrayList());
-      //  c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
-        currNumFiles = 0;
-        currSizeBytes = 0;
-        currNumFilesMap.remove(tableDestination);
-        currSizeBytesMap.remove(tableDestination);
+
+      PartitionData latestPartition = destinationData.getLatestPartition();
+      if (!latestPartition.canAccept(1, fileResult.fileByteSize)) {
+        // Too much data, roll over to a new partition.
+        latestPartition = new PartitionData();
+        destinationData.addPartition(latestPartition);
       }
-      currNumFilesMap.put(tableDestination, currNumFiles + 1);
-      currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize);
-      // Always add to the most recent partition for this table.
-      partitions.get(partitions.size() - 1).add(fileResult.filename);
+      latestPartition.addFilename(fileResult.filename);
+      latestPartition.addFiles(1);
+      latestPartition.addBytes(fileResult.fileByteSize);
     }
 
-    for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) {
+    // Now that we've figured out which tables and partitions to write out, emit this information
+    // to the next stage.
+    for (Map.Entry<TableDestination, DestinationData> entry : currentResults.entrySet()) {
       TableDestination tableDestination = entry.getKey();
-      List<List<String>> partitions = entry.getValue();
+      DestinationData destinationData = entry.getValue();
+      // In the fast-path case where we only output one table, the transform loads it directly
+      // to the final table. In this case, we output on a special TupleTag so the enclosing
+      // transform knows to skip the rename step.
       TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag =
-          (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag;
-      for (int i = 0; i < partitions.size(); ++i) {
-        c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), partitions.get(i)));
+          (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag;
+      for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
+        PartitionData partitionData = destinationData.getPartitions().get(i);
+        c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1),
+            partitionData.getFilenames()));
       }
     }
   }
-
-  private <T> T getOrDefault(Map<TableDestination, T> map, TableDestination tableDestination,
-                     T defaultValue) {
-    if (map.containsKey(tableDestination)) {
-      return map.get(tableDestination);
-    } else {
-      return defaultValue;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 752e7d3..9b1c989 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -89,8 +89,9 @@ class WriteRename extends DoFn<String, Void> {
       }
 
       // Make sure each destination table gets a unique job id.
-      String jobIdPrefix = String.format(
-          c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode());
+      String jobIdPrefix = BigQueryHelpers.createJobId(
+          c.sideInput(jobIdToken), finalTableDestination, -1);
+
       copy(
           bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
           bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index f7fe87b..4a6cd2b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -57,11 +57,15 @@ import org.slf4j.LoggerFactory;
 /**
  * Writes partitions to BigQuery tables.
  *
- * <p>The input is a list of files corresponding to a partition of a table. These files are
+ * <p>The input is a list of files corresponding to each partition of a table. These files are
  * load into a temporary table (or into the final table if there is only one partition). The output
- * is a {@link KV} mapping the final table to the temporary tables for each partition of that table.
+ * is a {@link KV} mapping each final table to a list of the temporary tables containing its data.
+ *
+ * <p>In the case where all the data in the files fit into a single load job, this transform loads
+ * the data directly into the final table, skipping temporary tables. In this case, the output
+ * {@link KV} maps the final table to itself.
  */
-class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
+class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, List<String>>,
     KV<TableDestination, String>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
@@ -94,10 +98,9 @@ class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<St
   public void processElement(ProcessContext c) throws Exception {
     TableDestination tableDestination = c.element().getKey().getKey();
     Integer partition = c.element().getKey().getShardNumber();
-    List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0);
-    // Job ID must be different for each partition of each table.
-    String jobIdPrefix = String.format(
-        c.sideInput(jobIdToken) + "_0x%08x_%05d", tableDestination.hashCode(), partition);
+    List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
+    String jobIdPrefix = BigQueryHelpers.createJobId(
+        c.sideInput(jobIdToken), tableDestination, partition);
 
     TableReference ref = tableDestination.getTableReference();
     if (!singlePartition) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d13061c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f10be13..d0004e4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
@@ -122,7 +123,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -607,13 +607,11 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void testStreamingWriteWithDynamicTables() throws Exception {
     testWriteWithDynamicTables(true);
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void testBatchWriteWithDynamicTables() throws Exception {
     testWriteWithDynamicTables(false);
   }
@@ -842,7 +840,7 @@ public class BigQueryIOTest implements Serializable {
             BigQueryIO.writeTableRows().to("foo.com:project:somedataset.sometable");
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -894,7 +892,7 @@ public class BigQueryIOTest implements Serializable {
         null,
         CreateDisposition.CREATE_IF_NEEDED,
         WriteDisposition.WRITE_EMPTY,
-        null,
+        "",
         false);
   }
 
@@ -905,7 +903,7 @@ public class BigQueryIOTest implements Serializable {
     checkWriteObject(
         write, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY,
-        null);
+        "");
   }
 
   @Test
@@ -917,7 +915,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to(table);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -927,7 +925,7 @@ public class BigQueryIOTest implements Serializable {
         BigQueryIO.<TableRow>write().to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+        schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -937,7 +935,7 @@ public class BigQueryIOTest implements Serializable {
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
+        null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -947,7 +945,7 @@ public class BigQueryIOTest implements Serializable {
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -957,7 +955,7 @@ public class BigQueryIOTest implements Serializable {
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, "");
   }
 
   @Test
@@ -967,7 +965,7 @@ public class BigQueryIOTest implements Serializable {
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, "");
   }
 
   @Test
@@ -977,7 +975,7 @@ public class BigQueryIOTest implements Serializable {
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, "");
   }
 
   @Test
@@ -1359,7 +1357,6 @@ public class BigQueryIOTest implements Serializable {
     SourceTestUtils.assertSplitAtFractionBehavior(
         bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
 
-
     List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
     assertEquals(2, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
@@ -1626,9 +1623,11 @@ public class BigQueryIOTest implements Serializable {
     TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
         new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
 
+    PCollection<WriteBundlesToFiles.Result> filesPCollection =
+        p.apply(Create.of(files).withType(new TypeDescriptor<WriteBundlesToFiles.Result>() {}));
     PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
         PCollectionViews.iterableView(
-        p,
+        filesPCollection,
         WindowingStrategy.globalDefault(),
         WriteBundlesToFiles.ResultCoder.of());
 
@@ -1699,14 +1698,12 @@ public class BigQueryIOTest implements Serializable {
 
     Path baseDir = Files.createTempDirectory(tempFolder, "testWriteTables");
 
-    List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions =
-        Lists.newArrayList();
+    List<KV<ShardedKey<TableDestination>, List<String>>> partitions = Lists.newArrayList();
     for (int i = 0; i < numTables; ++i) {
       String tableName = String.format("project-id:dataset-id.table%05d", i);
       TableDestination tableDestination = new TableDestination(tableName, tableName);
       for (int j = 0; j < numPartitions; ++j) {
-        String tempTableId = String.format(
-            jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j);
+        String tempTableId = BigQueryHelpers.createJobId(jobIdToken, tableDestination, j);
         List<String> filesPerPartition = Lists.newArrayList();
         for (int k = 0; k < numFilesPerPartition; ++k) {
           String filename = Paths.get(baseDir.toString(),
@@ -1721,7 +1718,7 @@ public class BigQueryIOTest implements Serializable {
           filesPerPartition.add(filename);
         }
         partitions.add(KV.of(ShardedKey.of(tableDestination, j),
-            (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
+            filesPerPartition));
 
         List<String> expectedTables = expectedTempTables.get(tableDestination);
         if (expectedTables == null) {
@@ -1735,11 +1732,6 @@ public class BigQueryIOTest implements Serializable {
       }
     }
 
-    PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables));
-    PCollectionView<Iterable<String>> tempTablesView = PCollectionViews.iterableView(
-        expectedTempTablesPCollection,
-        WindowingStrategy.globalDefault(),
-        StringUtf8Coder.of());
     PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
     PCollectionView<String> jobIdTokenView =
         jobIdTokenCollection.apply(View.<String>asSingleton());
@@ -1753,10 +1745,10 @@ public class BigQueryIOTest implements Serializable {
         CreateDisposition.CREATE_IF_NEEDED,
         null);
 
-    DoFnTester<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
+    DoFnTester<KV<ShardedKey<TableDestination>, List<String>>,
         KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    for (KV<ShardedKey<TableDestination>, Iterable<List<String>>> partition : partitions) {
+    for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) {
       tester.processElement(partition);
     }
 
@@ -1848,11 +1840,27 @@ public class BigQueryIOTest implements Serializable {
       }
     }
 
+    PCollection<KV<TableDestination, String>> tempTablesPCollection =
+        p.apply(Create.of(tempTables)
+            .withCoder(KvCoder.of(TableDestinationCoder.of(),
+                IterableCoder.of(StringUtf8Coder.of()))))
+            .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>,
+                KV<TableDestination, String>>() {
+              @ProcessElement
+              public void processElement(ProcessContext c) {
+                TableDestination tableDestination = c.element().getKey();
+                for (String tempTable : c.element().getValue()) {
+                  c.output(KV.of(tableDestination, tempTable));
+                }
+              }
+            }));
+
     PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
         PCollectionViews.multimapView(
-        p,
+            tempTablesPCollection,
         WindowingStrategy.globalDefault(),
-        KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()));
+        KvCoder.of(TableDestinationCoder.of(),
+            StringUtf8Coder.of()));
 
     PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
     PCollectionView<String> jobIdTokenView =