You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:46 UTC

[12/50] [abbrv] beam git commit: Refactor batch load job path, and add support for data-dependent tables.

Refactor batch load job path, and add support for data-dependent tables.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8581caf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8581caf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8581caf3

Branch: refs/heads/DSL_SQL
Commit: 8581caf388ad688a0e79cfa154262d1e701dee10
Parents: 58ed5c7
Author: Reuven Lax <re...@google.com>
Authored: Wed Mar 29 07:34:10 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  | 180 ----------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 203 +++++++++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |   3 +-
 .../sdk/io/gcp/bigquery/TableDestination.java   |  17 +-
 .../sdk/io/gcp/bigquery/TableRowWriter.java     |  12 +-
 .../beam/sdk/io/gcp/bigquery/WriteBundles.java  |  82 --------
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 102 ++++++++++
 .../sdk/io/gcp/bigquery/WritePartition.java     |  95 ++++++---
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |  63 +++---
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  47 ++---
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  27 +--
 11 files changed, 469 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
deleted file mode 100644
index 160b231..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
- */
-class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
-  BigQueryIO.Write<T> write;
-
-  BatchLoadBigQuery(BigQueryIO.Write<T> write) {
-    this.write = write;
-  }
-
-  @Override
-  public WriteResult expand(PCollection<T> input) {
-    Pipeline p = input.getPipeline();
-    BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-    ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
-
-    final String stepUuid = BigQueryHelpers.randomUUIDString();
-
-    String tempLocation = options.getTempLocation();
-    String tempFilePrefix;
-    try {
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-      tempFilePrefix = factory.resolve(
-          factory.resolve(tempLocation, "BigQueryWriteTemp"),
-          stepUuid);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-          e);
-    }
-
-    // Create a singleton job ID token at execution time.
-    PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
-    PCollectionView<String> jobIdTokenView = p
-        .apply("TriggerIdCreation", Create.of("ignored"))
-        .apply("CreateJobId", MapElements.via(
-            new SimpleFunction<String, String>() {
-              @Override
-              public String apply(String input) {
-                return stepUuid;
-              }
-            }))
-        .apply(View.<String>asSingleton());
-
-    PCollection<T> typedInputInGlobalWindow =
-        input.apply(
-            Window.<T>into(new GlobalWindows())
-                .triggering(DefaultTrigger.of())
-                .discardingFiredPanes());
-    // Avoid applying the formatFunction if it is the identity formatter.
-    PCollection<TableRow> inputInGlobalWindow;
-    if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
-      inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
-    } else {
-      inputInGlobalWindow =
-          typedInputInGlobalWindow.apply(
-              MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction()));
-    }
-
-    // PCollection of filename, file byte size.
-    PCollection<KV<String, Long>> results = inputInGlobalWindow
-        .apply("WriteBundles",
-            ParDo.of(new WriteBundles(tempFilePrefix)));
-
-    TupleTag<KV<Long, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<Long, List<String>>> singlePartitionTag =
-        new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
-    // Turn the list of files and record counts in a PCollectionView that can be used as a
-    // side input.
-    PCollectionView<Iterable<KV<String, Long>>> resultsView = results
-        .apply("ResultsView", View.<KV<String, Long>>asIterable());
-    PCollectionTuple partitions = singleton.apply(ParDo
-        .of(new WritePartition(
-            resultsView,
-            multiPartitionsTag,
-            singlePartitionTag))
-        .withSideInputs(resultsView)
-        .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
-    // If WriteBundles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
-    // the import needs to be split into multiple partitions, and those partitions will be
-    // specified in multiPartitionsTag.
-    PCollection<String> tempTables = partitions.get(multiPartitionsTag)
-        .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
-        .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
-            false,
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            tempFilePrefix,
-            NestedValueProvider.of(table, new TableRefToJson()),
-            write.getJsonSchema(),
-            WriteDisposition.WRITE_EMPTY,
-            CreateDisposition.CREATE_IF_NEEDED,
-            write.getTableDescription()))
-            .withSideInputs(jobIdTokenView));
-
-    PCollectionView<Iterable<String>> tempTablesView = tempTables
-        .apply("TempTablesView", View.<String>asIterable());
-    singleton.apply(ParDo
-        .of(new WriteRename(
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            NestedValueProvider.of(table, new TableRefToJson()),
-            write.getWriteDisposition(),
-            write.getCreateDisposition(),
-            tempTablesView,
-            write.getTableDescription()))
-        .withSideInputs(tempTablesView, jobIdTokenView));
-
-    // Write single partition to final table
-    partitions.get(singlePartitionTag)
-        .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
-        .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
-            true,
-            write.getBigQueryServices(),
-            jobIdTokenView,
-            tempFilePrefix,
-            NestedValueProvider.of(table, new TableRefToJson()),
-            write.getJsonSchema(),
-            write.getWriteDisposition(),
-            write.getCreateDisposition(),
-            write.getTableDescription()))
-            .withSideInputs(jobIdTokenView));
-
-    return WriteResult.in(input.getPipeline());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
new file mode 100644
index 0000000..8594211
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+
+/**
+ * PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery.
+ */
+class BatchLoads<T> extends
+    PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+  BigQueryIO.Write<T> write;
+
+  private static class ConstantSchemaFunction implements
+      SerializableFunction<TableDestination, TableSchema> {
+    private final @Nullable
+    String jsonSchema;
+
+    ConstantSchemaFunction(TableSchema schema) {
+      this.jsonSchema = BigQueryHelpers.toJsonString(schema);
+    }
+
+    @Override
+    @Nullable
+    public TableSchema apply(TableDestination table) {
+      return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+    }
+  }
+
+  BatchLoads(BigQueryIO.Write<T> write) {
+    this.write = write;
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+    Pipeline p = input.getPipeline();
+    BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+    ValueProvider<TableReference> table = write.getTableWithDefaultProject(options);
+
+    final String stepUuid = BigQueryHelpers.randomUUIDString();
+
+    String tempLocation = options.getTempLocation();
+    String tempFilePrefix;
+    try {
+      IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+      tempFilePrefix = factory.resolve(
+          factory.resolve(tempLocation, "BigQueryWriteTemp"),
+          stepUuid);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+          e);
+    }
+
+    // Create a singleton job ID token at execution time. This will be used as the base for all
+    // load jobs issued from this instance of the transfomr.
+    PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+    PCollectionView<String> jobIdTokenView = p
+        .apply("TriggerIdCreation", Create.of("ignored"))
+        .apply("CreateJobId", MapElements.via(
+            new SimpleFunction<String, String>() {
+              @Override
+              public String apply(String input) {
+                return stepUuid;
+              }
+            }))
+        .apply(View.<String>asSingleton());
+
+    PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
+        input.apply(
+            Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
+                .triggering(DefaultTrigger.of())
+                .discardingFiredPanes());
+
+    // PCollection of filename, file byte size, and table destination.
+    PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow
+        .apply("WriteBundlesToFiles",
+            ParDo.of(new WriteBundlesToFiles(tempFilePrefix)));
+
+    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+
+    // Turn the list of files and record counts in a PCollectionView that can be used as a
+    // side input.
+    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView = results
+        .apply("ResultsView", View.<WriteBundlesToFiles.Result>asIterable());
+    // This transform will look at the set of files written for each table, and if any table has
+    // too many files or bytes, will partition that table's files into multiple partitions for
+    // loading.
+    PCollectionTuple partitions = singleton.apply(ParDo
+        .of(new WritePartition(
+            write.getTable(),
+            write.getTableDescription(),
+            resultsView,
+            multiPartitionsTag,
+            singlePartitionTag))
+        .withSideInputs(resultsView)
+        .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+    // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
+    // schema function here. If no schema is specified, this function will return null.
+    SerializableFunction<TableDestination, TableSchema> schemaFunction =
+        new ConstantSchemaFunction(write.getSchema());
+
+    // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
+    // the import needs to be split into multiple partitions, and those partitions will be
+    // specified in multiPartitionsTag.
+    PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag)
+        // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe
+        // Reshuffle is better here.
+        .apply("MultiPartitionsGroupByKey",
+            GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+        .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+            false,
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            tempFilePrefix,
+            WriteDisposition.WRITE_EMPTY,
+            CreateDisposition.CREATE_IF_NEEDED,
+            schemaFunction))
+            .withSideInputs(jobIdTokenView));
+
+    // This view maps each final table destination to the set of temporary partitioned tables
+    // the PCollection was loaded into.
+    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables
+        .apply("TempTablesView", View.<TableDestination, String>asMultimap());
+
+    singleton.apply(ParDo
+        .of(new WriteRename(
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            write.getWriteDisposition(),
+            write.getCreateDisposition(),
+            tempTablesView,
+            write.getTableDescription()))
+        .withSideInputs(tempTablesView, jobIdTokenView));
+
+    // Write single partition to final table
+    partitions.get(singlePartitionTag)
+        .apply("SinglePartitionGroupByKey",
+            GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+        .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+            true,
+            write.getBigQueryServices(),
+            jobIdTokenView,
+            tempFilePrefix,
+            write.getWriteDisposition(),
+            write.getCreateDisposition(),
+            schemaFunction))
+            .withSideInputs(jobIdTokenView));
+
+    return WriteResult.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index af19b83..f1baaf7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -984,7 +984,8 @@ public class BigQueryIO {
       if (input.isBounded() == IsBounded.UNBOUNDED) {
         return rowsWithDestination.apply(new StreamingInserts(this));
       } else {
-        return input.apply(new BatchLoadBigQuery<T>(this));
+
+        return rowsWithDestination.apply(new BatchLoads<T>(this));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 631afeb..1c2b256 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
 
+import java.util.Objects;
+
 /**
  * Encapsulates a BigQuery table destination.
  */
@@ -42,7 +44,6 @@ public class TableDestination {
     return tableSpec;
   }
 
-
   public TableReference getTableReference() {
     return BigQueryHelpers.parseTableSpec(tableSpec);
   }
@@ -50,4 +51,18 @@ public class TableDestination {
   public String getTableDescription() {
     return tableDescription;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof TableDestination)) {
+      return false;
+    }
+    TableDestination other = (TableDestination) o;
+    return tableSpec == other.tableSpec && tableDescription == other.tableDescription;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(tableSpec, tableDescription);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
index 014c498..a1f6153 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowWriter.java
@@ -48,6 +48,14 @@ class TableRowWriter {
   protected String mimeType = MimeTypes.TEXT;
   private CountingOutputStream out;
 
+  public class Result {
+    String filename;
+    long byteSize;
+    public Result(String filename, long byteSize) {
+      this.filename = filename;
+      this.byteSize = byteSize;
+    }
+  }
   TableRowWriter(String basename) {
     this.tempFilePrefix = basename;
   }
@@ -77,8 +85,8 @@ class TableRowWriter {
     out.write(NEWLINE);
   }
 
-  public final KV<String, Long> close() throws IOException {
+  public final Result close() throws IOException {
     channel.close();
-    return KV.of(fileName, out.getCount());
+    return new Result(fileName, out.getCount());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
deleted file mode 100644
index 6219226..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundles.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableRow;
-import java.util.UUID;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.KV;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Writes each bundle of {@link TableRow} elements out to a separate file using
- * {@link TableRowWriter}.
- */
-class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
-  private static final Logger LOG = LoggerFactory.getLogger(WriteBundles.class);
-
-  private transient TableRowWriter writer = null;
-  private final String tempFilePrefix;
-
-  WriteBundles(String tempFilePrefix) {
-    this.tempFilePrefix = tempFilePrefix;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    if (writer == null) {
-      writer = new TableRowWriter(tempFilePrefix);
-      writer.open(UUID.randomUUID().toString());
-      LOG.debug("Done opening writer {}", writer);
-    }
-    try {
-      writer.write(c.element());
-    } catch (Exception e) {
-      // Discard write result and close the write.
-      try {
-        writer.close();
-        // The writer does not need to be reset, as this DoFn cannot be reused.
-      } catch (Exception closeException) {
-        // Do not mask the exception that caused the write to fail.
-        e.addSuppressed(closeException);
-      }
-      throw e;
-    }
-  }
-
-  @FinishBundle
-  public void finishBundle(Context c) throws Exception {
-    if (writer != null) {
-      c.output(writer.close());
-      writer = null;
-    }
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-
-    builder
-        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-            .withLabel("Temporary File Prefix"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
new file mode 100644
index 0000000..4e6167b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+
+import java.util.Map;
+import java.util.UUID;
+
+import com.google.common.collect.Maps;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes each bundle of {@link TableRow} elements out to a separate file using
+ * {@link TableRowWriter}.
+ */
+class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBundlesToFiles.Result> {
+  private static final Logger LOG = LoggerFactory.getLogger(WriteBundlesToFiles.class);
+
+  // Map from tablespec to a writer for that table.
+  private transient Map<TableDestination, TableRowWriter> writers;
+  private final String tempFilePrefix;
+
+  public static class Result {
+    public String filename;
+    public Long fileByteSize;
+    public TableDestination tableDestination;
+
+    public Result(String filename, Long fileByteSize, TableDestination tableDestination) {
+      this.filename = filename;
+      this.fileByteSize = fileByteSize;
+      this.tableDestination = tableDestination;
+    }
+  }
+  WriteBundlesToFiles(String tempFilePrefix) {
+    this.tempFilePrefix = tempFilePrefix;
+    this.writers = Maps.newHashMap();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    // ??? can we assume Java8?
+    TableRowWriter writer = writers.getOrDefault(c.element().getKey(), null);
+    if (writer == null) {
+      writer = new TableRowWriter(tempFilePrefix);
+      writer.open(UUID.randomUUID().toString());
+      writers.put(c.element().getKey(), writer);
+      LOG.debug("Done opening writer {}", writer);
+    }
+    try {
+      writer.write(c.element().getValue());
+    } catch (Exception e) {
+      // Discard write result and close the write.
+      try {
+        writer.close();
+        // The writer does not need to be reset, as this DoFn cannot be reused.
+      } catch (Exception closeException) {
+        // Do not mask the exception that caused the write to fail.
+        e.addSuppressed(closeException);
+      }
+      throw e;
+    }
+  }
+
+  @FinishBundle
+  public void finishBundle(Context c) throws Exception {
+    for (Map.Entry<TableDestination, TableRowWriter> entry : writers.entrySet()) {
+      TableRowWriter.Result result = entry.getValue().close();
+      c.output(new Result(result.filename, result.byteSize, entry.getKey()));
+    }
+    writers.clear();
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+
+    builder
+        .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+            .withLabel("Temporary File Prefix"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 1b6492e..8e1b16d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -18,27 +18,40 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * Partitions temporary files based on number of files and file sizes.
+ * Partitions temporary files based on number of files and file sizes. Output key is a pair of
+ * tablespec and the list of files corresponding to each partition of that table.
  */
-class WritePartition extends DoFn<String, KV<Long, List<String>>> {
-  private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
-  private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
-  private TupleTag<KV<Long, List<String>>> singlePartitionTag;
+class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List<String>>> {
+  private final ValueProvider<TableReference> singletonOutputTable;
+  private final String singletonOutputTableDescription;
+  private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView;
+  private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag;
+  private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag;
 
   public WritePartition(
-      PCollectionView<Iterable<KV<String, Long>>> resultsView,
-      TupleTag<KV<Long, List<String>>> multiPartitionsTag,
-      TupleTag<KV<Long, List<String>>> singlePartitionTag) {
+      ValueProvider<TableReference> singletonOutputTable,
+      String singletonOutputTableDescription,
+      PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
+      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag,
+      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag) {
+    this.singletonOutputTable = singletonOutputTable;
+    this.singletonOutputTableDescription = singletonOutputTableDescription;
     this.resultsView = resultsView;
     this.multiPartitionsTag = multiPartitionsTag;
     this.singlePartitionTag = singlePartitionTag;
@@ -46,34 +59,62 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> {
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    List<KV<String, Long>> results = Lists.newArrayList(c.sideInput(resultsView));
-    if (results.isEmpty()) {
-      TableRowWriter writer = new TableRowWriter(c.element());
-      writer.open(UUID.randomUUID().toString());
-      results.add(writer.close());
+    List<WriteBundlesToFiles.Result> results = Lists.newArrayList(c.sideInput(resultsView));
+
+    // If there are no elements to write _and_ the user specified a constant output table, then
+    // generate an empty table of that name.
+    if (results.isEmpty() && singletonOutputTable != null) {
+      TableReference singletonTable = singletonOutputTable.get();
+      if (singletonTable != null) {
+        TableRowWriter writer = new TableRowWriter(c.element());
+        writer.open(UUID.randomUUID().toString());
+        TableRowWriter.Result writerResult = writer.close();
+        results.add(new Result(writerResult.filename, writerResult.byteSize,
+            new TableDestination(singletonTable, singletonOutputTableDescription)));
+      }
     }
 
+
     long partitionId = 0;
-    int currNumFiles = 0;
-    long currSizeBytes = 0;
-    List<String> currResults = Lists.newArrayList();
+    Map<TableDestination, Integer> currNumFilesMap = Maps.newHashMap();
+    Map<TableDestination, Long> currSizeBytesMap = Maps.newHashMap();
+    Map<TableDestination, List<List<String>>> currResultsMap = Maps.newHashMap();
     for (int i = 0; i < results.size(); ++i) {
-      KV<String, Long> fileResult = results.get(i);
+      WriteBundlesToFiles.Result fileResult = results.get(i);
+      TableDestination tableDestination = fileResult.tableDestination;
+      // JAVA8
+      List<List<String>> partitions = currResultsMap.getOrDefault(tableDestination, null);
+      if (partitions == null) {
+        partitions = Lists.newArrayList();
+        partitions.add(Lists.<String>newArrayList());
+        currResultsMap.put(tableDestination, partitions);
+      }
+      int currNumFiles = currNumFilesMap.getOrDefault(tableDestination, 0);
+      long currSizeBytes = currSizeBytesMap.getOrDefault(tableDestination, 0L);
       if (currNumFiles + 1 > Write.MAX_NUM_FILES
-          || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
-        c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
-        currResults = Lists.newArrayList();
+          || currSizeBytes + fileResult.fileByteSize > Write.MAX_SIZE_BYTES) {
+        // Add a new partition for this table.
+        partitions.add(Lists.<String>newArrayList());
+      //  c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
         currNumFiles = 0;
         currSizeBytes = 0;
+        currNumFilesMap.remove(tableDestination);
+        currSizeBytesMap.remove(tableDestination);
       }
-      ++currNumFiles;
-      currSizeBytes += fileResult.getValue();
-      currResults.add(fileResult.getKey());
+      currNumFilesMap.put(tableDestination, currNumFiles + 1);
+      currSizeBytesMap.put(tableDestination, currSizeBytes + fileResult.fileByteSize);
+      // Always add to the most recent partition for this table.
+      partitions.get(partitions.size() - 1).add(fileResult.filename);
     }
-    if (partitionId == 0) {
-      c.output(singlePartitionTag, KV.of(++partitionId, currResults));
-    } else {
-      c.output(multiPartitionsTag, KV.of(++partitionId, currResults));
+
+    for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) {
+      TableDestination tableDestination = entry.getKey();
+      List<List<String>> partitions = entry.getValue();
+      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> outputTag =
+          (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag;
+      for (int i = 0; i < partitions.size(); ++i) {
+        c.output(outputTag, KV.of(KV.of(tableDestination, i + 1), partitions.get(i)));
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index 8cb9439..fbfb290 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import avro.shaded.com.google.common.collect.Maps;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
@@ -25,6 +26,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
@@ -49,24 +51,21 @@ class WriteRename extends DoFn<String, Void> {
 
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
-  private final ValueProvider<String> jsonTableRef;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
-  private final PCollectionView<Iterable<String>> tempTablesView;
+  private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView;
   @Nullable
   private final String tableDescription;
 
   public WriteRename(
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
-      ValueProvider<String> jsonTableRef,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
-      PCollectionView<Iterable<String>> tempTablesView,
+      PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView,
       @Nullable String tableDescription) {
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
-    this.jsonTableRef = jsonTableRef;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
     this.tempTablesView = tempTablesView;
@@ -75,30 +74,40 @@ class WriteRename extends DoFn<String, Void> {
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    List<String> tempTablesJson = Lists.newArrayList(c.sideInput(tempTablesView));
+    Map<TableDestination, Iterable<String>> tempTablesMap =
+        Maps.newHashMap(c.sideInput(tempTablesView));
 
-    // Do not copy if no temp tables are provided
-    if (tempTablesJson.size() == 0) {
-      return;
-    }
+    // Process each destination table.
+    for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) {
+      TableDestination finalTableDestination = entry.getKey();
+      List<String> tempTablesJson = Lists.newArrayList(entry.getValue());
+      // Do not copy if no temp tables are provided
+      if (tempTablesJson.size() == 0) {
+        return;
+      }
+
+      List<TableReference> tempTables = Lists.newArrayList();
+      for (String table : tempTablesJson) {
+        tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
+      }
+
+      // Make sure each destination table gets a unique job id.
+      String jobIdPrefix = String.format(
+          c.sideInput(jobIdToken) + "0x%08x", finalTableDestination.hashCode());
+      copy(
+          bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+          bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+          jobIdPrefix,
+          finalTableDestination.getTableReference(),
+          tempTables,
+          writeDisposition,
+          createDisposition,
+          tableDescription);
 
-    List<TableReference> tempTables = Lists.newArrayList();
-    for (String table : tempTablesJson) {
-      tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
+      DatasetService tableService =
+          bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+      removeTemporaryTables(tableService, tempTables);
     }
-    copy(
-        bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-        c.sideInput(jobIdToken),
-        BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
-        tempTables,
-        writeDisposition,
-        createDisposition,
-        tableDescription);
-
-    DatasetService tableService =
-        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
-    removeTemporaryTables(tableService, tempTables);
   }
 
   private void copy(
@@ -170,8 +179,6 @@ class WriteRename extends DoFn<String, Void> {
     super.populateDisplayData(builder);
 
     builder
-        .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
-            .withLabel("Table Reference"))
         .add(DisplayData.item("writeDisposition", writeDisposition.toString())
             .withLabel("Write Disposition"))
         .add(DisplayData.item("createDisposition", createDisposition.toString())

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 29680ad..5051c95 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.FileIOChannelFactory;
 import org.apache.beam.sdk.util.GcsIOChannelFactory;
@@ -57,48 +58,45 @@ import org.slf4j.LoggerFactory;
 /**
  * Writes partitions to BigQuery tables.
  */
-class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
+class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<String>>>,
+    KV<TableDestination, String>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
   private final boolean singlePartition;
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
   private final String tempFilePrefix;
-  private final ValueProvider<String> jsonTableRef;
-  private final ValueProvider<String> jsonSchema;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
-  @Nullable
-  private final String tableDescription;
+  private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
 
   public WriteTables(
       boolean singlePartition,
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       String tempFilePrefix,
-      ValueProvider<String> jsonTableRef,
-      ValueProvider<String> jsonSchema,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
-      @Nullable String tableDescription) {
+      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.tempFilePrefix = tempFilePrefix;
-    this.jsonTableRef = jsonTableRef;
-    this.jsonSchema = jsonSchema;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
-    this.tableDescription = tableDescription;
+    this.schemaFunction = schemaFunction;
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
+    TableDestination tableDestination = c.element().getKey().getKey();
+    Integer partition = c.element().getKey().getValue();
+    List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0);
+    // Job ID must be different for each partition of each table.
     String jobIdPrefix = String.format(
-        c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
-    TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
-        TableReference.class);
+        c.sideInput(jobIdToken) + "0x%08x_%05d", tableDestination.hashCode(), partition);
+
+    TableReference ref = tableDestination.getTableReference();
     if (!singlePartition) {
       ref.setTableId(jobIdPrefix);
     }
@@ -108,15 +106,14 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
         bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
         jobIdPrefix,
         ref,
-        BigQueryHelpers.fromJsonString(
-            jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
-        partition,
+        schemaFunction.apply(tableDestination),
+        partitionFiles,
         writeDisposition,
         createDisposition,
-        tableDescription);
-    c.output(BigQueryHelpers.toJsonString(ref));
+        tableDestination.getTableDescription());
+    c.output(KV.of(tableDestination, BigQueryHelpers.toJsonString(ref)));
 
-    removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
+    removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partitionFiles);
   }
 
   private void load(
@@ -202,12 +199,6 @@ class WriteTables extends DoFn<KV<Long, Iterable<List<String>>>, String> {
 
     builder
         .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-            .withLabel("Temporary File Prefix"))
-        .addIfNotNull(DisplayData.item("jsonTableRef", jsonTableRef)
-            .withLabel("Table Reference"))
-        .addIfNotNull(DisplayData.item("jsonSchema", jsonSchema)
-            .withLabel("Table Schema"))
-        .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-            .withLabel("Table Description"));
+            .withLabel("Temporary File Prefix"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/8581caf3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d953edd..af39483 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -2078,26 +2078,27 @@ public class BigQueryIOTest implements Serializable {
       files.add(KV.of(fileName, fileSize));
     }
 
-    TupleTag<KV<Long, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<Long, List<String>>> singlePartitionTag =
-        new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
-    PCollection<KV<String, Long>> filesPCollection =
-        p.apply(Create.of(files).withType(new TypeDescriptor<KV<String, Long>>() {}));
-    PCollectionView<Iterable<KV<String, Long>>> filesView = PCollectionViews.iterableView(
-        filesPCollection,
+    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+
+    PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
+        PCollectionViews.iterableView(
+        p,
         WindowingStrategy.globalDefault(),
         KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
 
     WritePartition writePartition =
-        new WritePartition(filesView, multiPartitionsTag, singlePartitionTag);
+        new WritePartition(null, null, resultsView,
+            multiPartitionsTag, singlePartitionTag);
 
-    DoFnTester<String, KV<Long, List<String>>> tester = DoFnTester.of(writePartition);
-    tester.setSideInput(filesView, GlobalWindow.INSTANCE, files);
+    DoFnTester<String, KV<KV<TableDestination, Integer>, List<String>>> tester =
+        DoFnTester.of(writePartition);
+    tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
     tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
-    List<KV<Long, List<String>>> partitions;
+    List<KV<KV<TableDestination, Integer>, List<String>>> partitions;
     if (expectedNumPartitions > 1) {
       partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {