You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:52 UTC

[18/50] [abbrv] beam git commit: Refactor batch loads, and add support for windowed writes.

Refactor batch loads, and add support for windowed writes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/760a9458
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/760a9458
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/760a9458

Branch: refs/heads/DSL_SQL
Commit: 760a94580d7561bb63a3eea67d8e5443c233a541
Parents: 8581caf
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 31 11:19:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/IOChannelUtils.java    |   9 +
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |  49 +-
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |  24 +-
 .../sdk/io/gcp/bigquery/TableDestination.java   |  10 +-
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  54 +-
 .../sdk/io/gcp/bigquery/WritePartition.java     |  28 +-
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   |  13 +-
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  14 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 838 +++++--------------
 .../io/gcp/bigquery/FakeBigQueryServices.java   |  96 +++
 .../sdk/io/gcp/bigquery/FakeDatasetService.java | 172 ++++
 .../sdk/io/gcp/bigquery/FakeJobService.java     | 273 ++++++
 .../sdk/io/gcp/bigquery/TableContainer.java     |  36 +
 13 files changed, 948 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index ea53527..9d3dd23 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Sets;
 import com.google.common.collect.TreeMultimap;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.text.DecimalFormat;
 import java.util.Arrays;
@@ -181,6 +182,14 @@ public class IOChannelUtils {
   }
 
   /**
+   * Creates a read channel for the given filename.
+   */
+  public static ReadableByteChannel open(String filename)
+      throws IOException {
+    return getFactory(filename).open(filename);
+  }
+
+  /**
    * Creates a write channel for the given file components.
    *
    * <p>If numShards is specified, then a ShardingWritableByteChannel is

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 8594211..5e80fae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -26,6 +26,10 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
@@ -61,16 +65,17 @@ class BatchLoads<T> extends
   private static class ConstantSchemaFunction implements
       SerializableFunction<TableDestination, TableSchema> {
     private final @Nullable
-    String jsonSchema;
+    ValueProvider<String> jsonSchema;
 
-    ConstantSchemaFunction(TableSchema schema) {
-      this.jsonSchema = BigQueryHelpers.toJsonString(schema);
+    ConstantSchemaFunction(ValueProvider<String> jsonSchema) {
+      this.jsonSchema = jsonSchema;
     }
 
     @Override
     @Nullable
     public TableSchema apply(TableDestination table) {
-      return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+      return BigQueryHelpers.fromJsonString(
+          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
     }
   }
 
@@ -114,7 +119,7 @@ class BatchLoads<T> extends
         .apply(View.<String>asSingleton());
 
     PCollection<KV<TableDestination, TableRow>> inputInGlobalWindow =
-        input.apply(
+        input.apply("rewindowIntoGlobal",
             Window.<KV<TableDestination, TableRow>>into(new GlobalWindows())
                 .triggering(DefaultTrigger.of())
                 .discardingFiredPanes());
@@ -122,12 +127,13 @@ class BatchLoads<T> extends
     // PCollection of filename, file byte size, and table destination.
     PCollection<WriteBundlesToFiles.Result> results = inputInGlobalWindow
         .apply("WriteBundlesToFiles",
-            ParDo.of(new WriteBundlesToFiles(tempFilePrefix)));
+            ParDo.of(new WriteBundlesToFiles(tempFilePrefix)))
+        .setCoder(WriteBundlesToFiles.ResultCoder.of());
 
-    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
 
     // Turn the list of files and record counts in a PCollectionView that can be used as a
     // side input.
@@ -136,9 +142,9 @@ class BatchLoads<T> extends
     // This transform will look at the set of files written for each table, and if any table has
     // too many files or bytes, will partition that table's files into multiple partitions for
     // loading.
-    PCollectionTuple partitions = singleton.apply(ParDo
-        .of(new WritePartition(
-            write.getTable(),
+    PCollectionTuple partitions = singleton.apply("WritePartition",
+        ParDo.of(new WritePartition(
+            write.getJsonTableRef(),
             write.getTableDescription(),
             resultsView,
             multiPartitionsTag,
@@ -148,17 +154,22 @@ class BatchLoads<T> extends
 
     // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
     // schema function here. If no schema is specified, this function will return null.
+    // TODO: Turn this into a side-input instead.
     SerializableFunction<TableDestination, TableSchema> schemaFunction =
-        new ConstantSchemaFunction(write.getSchema());
+        new ConstantSchemaFunction(write.getJsonSchema());
 
+    Coder<KV<ShardedKey<TableDestination>, List<String>>> partitionsCoder =
+        KvCoder.of(ShardedKeyCoder.of(TableDestinationCoder.of()),
+            ListCoder.of(StringUtf8Coder.of()));
     // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
     // the import needs to be split into multiple partitions, and those partitions will be
     // specified in multiPartitionsTag.
     PCollection<KV<TableDestination, String>> tempTables = partitions.get(multiPartitionsTag)
+        .setCoder(partitionsCoder)
         // What's this GroupByKey for? Is this so we have a deterministic temp tables? If so, maybe
         // Reshuffle is better here.
         .apply("MultiPartitionsGroupByKey",
-            GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+            GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
         .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
             false,
             write.getBigQueryServices(),
@@ -174,20 +185,20 @@ class BatchLoads<T> extends
     PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView = tempTables
         .apply("TempTablesView", View.<TableDestination, String>asMultimap());
 
-    singleton.apply(ParDo
+    singleton.apply("WriteRename", ParDo
         .of(new WriteRename(
             write.getBigQueryServices(),
             jobIdTokenView,
             write.getWriteDisposition(),
             write.getCreateDisposition(),
-            tempTablesView,
-            write.getTableDescription()))
+            tempTablesView))
         .withSideInputs(tempTablesView, jobIdTokenView));
 
     // Write single partition to final table
     partitions.get(singlePartitionTag)
+        .setCoder(partitionsCoder)
         .apply("SinglePartitionGroupByKey",
-            GroupByKey.<KV<TableDestination, Integer>, List<String>>create())
+            GroupByKey.<ShardedKey<TableDestination>, List<String>>create())
         .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
             true,
             write.getBigQueryServices(),

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
index 8c968df..ab57446 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ShardedKey.java
@@ -18,10 +18,13 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import java.io.Serializable;
+import java.util.Objects;
+
 /**
  * A key and a shard number.
  */
-class ShardedKey<K> {
+class ShardedKey<K> implements Serializable {
   private final K key;
   private final int shardNumber;
 
@@ -41,4 +44,23 @@ class ShardedKey<K> {
   public int getShardNumber() {
     return shardNumber;
   }
+
+  @Override
+  public String toString() {
+    return "key: " + key + " shard: " + shardNumber;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ShardedKey)) {
+      return false;
+    }
+    ShardedKey<K> other = (ShardedKey<K>) o;
+    return (key == other.key) && (shardNumber == other.shardNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, shardNumber);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 1c2b256..e8538e0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -20,12 +20,13 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
 
+import java.io.Serializable;
 import java.util.Objects;
 
 /**
  * Encapsulates a BigQuery table destination.
  */
-public class TableDestination {
+public class TableDestination implements Serializable {
   private final String tableSpec;
   private final String tableDescription;
 
@@ -53,12 +54,17 @@ public class TableDestination {
   }
 
   @Override
+  public String toString() {
+    return "tableSpec: " + tableSpec + " tableDescription: " + tableDescription;
+  }
+
+  @Override
   public boolean equals(Object o) {
     if (!(o instanceof TableDestination)) {
       return false;
     }
     TableDestination other = (TableDestination) o;
-    return tableSpec == other.tableSpec && tableDescription == other.tableDescription;
+    return (tableSpec == other.tableSpec) && (tableDescription == other.tableDescription);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 4e6167b..b8069f6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -20,10 +20,19 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableRow;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.UUID;
 
 import com.google.common.collect.Maps;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
@@ -41,7 +50,7 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
   private transient Map<TableDestination, TableRowWriter> writers;
   private final String tempFilePrefix;
 
-  public static class Result {
+  public static class Result implements Serializable {
     public String filename;
     public Long fileByteSize;
     public TableDestination tableDestination;
@@ -52,15 +61,54 @@ class WriteBundlesToFiles extends DoFn<KV<TableDestination, TableRow>, WriteBund
       this.tableDestination = tableDestination;
     }
   }
+
+  public static class ResultCoder extends AtomicCoder<Result> {
+    private static final ResultCoder INSTANCE = new ResultCoder();
+
+    public static ResultCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(Result value, OutputStream outStream, Context context)
+        throws IOException {
+      if (value == null) {
+        throw new CoderException("cannot encode a null value");
+      }
+      stringCoder.encode(value.filename, outStream, context.nested());
+      longCoder.encode(value.fileByteSize, outStream, context.nested());
+      tableDestinationCoder.encode(value.tableDestination, outStream, context.nested());
+    }
+
+    @Override
+    public Result decode(InputStream inStream, Context context)
+        throws IOException {
+      return new Result(stringCoder.decode(inStream, context.nested()),
+          longCoder.decode(inStream, context.nested()),
+          tableDestinationCoder.decode(inStream, context.nested()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+    }
+
+    StringUtf8Coder stringCoder = StringUtf8Coder.of();
+    VarLongCoder longCoder = VarLongCoder.of();
+    TableDestinationCoder tableDestinationCoder = TableDestinationCoder.of();
+  }
+
   WriteBundlesToFiles(String tempFilePrefix) {
     this.tempFilePrefix = tempFilePrefix;
+  }
+
+  @StartBundle
+  public void startBundle(Context c) {
     this.writers = Maps.newHashMap();
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    // ??? can we assume Java8?
-    TableRowWriter writer = writers.getOrDefault(c.element().getKey(), null);
+    TableRowWriter writer = writers.get(c.element().getKey());
     if (writer == null) {
       writer = new TableRowWriter(tempFilePrefix);
       writer.open(UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 8e1b16d..c48955b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -37,20 +37,20 @@ import org.apache.beam.sdk.values.TupleTag;
  * Partitions temporary files based on number of files and file sizes. Output key is a pair of
  * tablespec and the list of files corresponding to each partition of that table.
  */
-class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List<String>>> {
-  private final ValueProvider<TableReference> singletonOutputTable;
+class WritePartition extends DoFn<String, KV<ShardedKey<TableDestination>, List<String>>> {
+  private final ValueProvider<String> singletonOutputJsonTableRef;
   private final String singletonOutputTableDescription;
   private final PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView;
-  private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag;
-  private TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag;
+  private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag;
+  private TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag;
 
   public WritePartition(
-      ValueProvider<TableReference> singletonOutputTable,
+      ValueProvider<String> singletonOutputJsonTableRef,
       String singletonOutputTableDescription,
       PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView,
-      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag,
-      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag) {
-    this.singletonOutputTable = singletonOutputTable;
+      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag,
+      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag) {
+    this.singletonOutputJsonTableRef = singletonOutputJsonTableRef;
     this.singletonOutputTableDescription = singletonOutputTableDescription;
     this.resultsView = resultsView;
     this.multiPartitionsTag = multiPartitionsTag;
@@ -63,8 +63,9 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List
 
     // If there are no elements to write _and_ the user specified a constant output table, then
     // generate an empty table of that name.
-    if (results.isEmpty() && singletonOutputTable != null) {
-      TableReference singletonTable = singletonOutputTable.get();
+    if (results.isEmpty() && singletonOutputJsonTableRef != null) {
+      TableReference singletonTable = BigQueryHelpers.fromJsonString(
+          singletonOutputJsonTableRef.get(), TableReference.class);
       if (singletonTable != null) {
         TableRowWriter writer = new TableRowWriter(c.element());
         writer.open(UUID.randomUUID().toString());
@@ -82,8 +83,7 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List
     for (int i = 0; i < results.size(); ++i) {
       WriteBundlesToFiles.Result fileResult = results.get(i);
       TableDestination tableDestination = fileResult.tableDestination;
-      // JAVA8
-      List<List<String>> partitions = currResultsMap.getOrDefault(tableDestination, null);
+      List<List<String>> partitions = currResultsMap.get(tableDestination);
       if (partitions == null) {
         partitions = Lists.newArrayList();
         partitions.add(Lists.<String>newArrayList());
@@ -110,10 +110,10 @@ class WritePartition extends DoFn<String, KV<KV<TableDestination, Integer>, List
     for (Map.Entry<TableDestination, List<List<String>>> entry : currResultsMap.entrySet()) {
       TableDestination tableDestination = entry.getKey();
       List<List<String>> partitions = entry.getValue();
-      TupleTag<KV<KV<TableDestination, Integer>, List<String>>> outputTag =
+      TupleTag<KV<ShardedKey<TableDestination>, List<String>>> outputTag =
           (partitions.size() == 1) ? singlePartitionTag : multiPartitionsTag;
       for (int i = 0; i < partitions.size(); ++i) {
-        c.output(outputTag, KV.of(KV.of(tableDestination, i + 1), partitions.get(i)));
+        c.output(outputTag, KV.of(ShardedKey.of(tableDestination, i + 1), partitions.get(i)));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index fbfb290..752e7d3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import avro.shaded.com.google.common.collect.Maps;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -53,23 +52,21 @@ class WriteRename extends DoFn<String, Void> {
   private final PCollectionView<String> jobIdToken;
   private final WriteDisposition writeDisposition;
   private final CreateDisposition createDisposition;
+  // Map from final destination to a list of temporary tables that need to be copied into it.
   private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView;
-  @Nullable
-  private final String tableDescription;
+
 
   public WriteRename(
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
-      PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView,
-      @Nullable String tableDescription) {
+      PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView) {
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
     this.tempTablesView = tempTablesView;
-    this.tableDescription = tableDescription;
   }
 
   @ProcessElement
@@ -102,7 +99,7 @@ class WriteRename extends DoFn<String, Void> {
           tempTables,
           writeDisposition,
           createDisposition,
-          tableDescription);
+          finalTableDestination.getTableDescription());
 
       DatasetService tableService =
           bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 5051c95..f7fe87b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -57,8 +56,12 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Writes partitions to BigQuery tables.
+ *
+ * <p>The input is a list of files corresponding to a partition of a table. These files are
+ * load into a temporary table (or into the final table if there is only one partition). The output
+ * is a {@link KV} mapping the final table to the temporary tables for each partition of that table.
  */
-class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<String>>>,
+class WriteTables extends DoFn<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
     KV<TableDestination, String>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
@@ -90,23 +93,24 @@ class WriteTables extends DoFn<KV<KV<TableDestination, Integer>, Iterable<List<S
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     TableDestination tableDestination = c.element().getKey().getKey();
-    Integer partition = c.element().getKey().getValue();
+    Integer partition = c.element().getKey().getShardNumber();
     List<String> partitionFiles = Lists.newArrayList(c.element().getValue()).get(0);
     // Job ID must be different for each partition of each table.
     String jobIdPrefix = String.format(
-        c.sideInput(jobIdToken) + "0x%08x_%05d", tableDestination.hashCode(), partition);
+        c.sideInput(jobIdToken) + "_0x%08x_%05d", tableDestination.hashCode(), partition);
 
     TableReference ref = tableDestination.getTableReference();
     if (!singlePartition) {
       ref.setTableId(jobIdPrefix);
     }
 
+    TableSchema schema = (schemaFunction != null) ? schemaFunction.apply(tableDestination) : null;
     load(
         bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
         bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
         jobIdPrefix,
         ref,
-        schemaFunction.apply(tableDestination),
+        schema,
         partitionFiles,
         writeDisposition,
         createDisposition,

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index af39483..d1ef8e2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -18,9 +18,6 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -38,13 +35,7 @@ import static org.mockito.Mockito.when;
 
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.util.Data;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.ErrorProto;
 import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatistics2;
@@ -55,18 +46,16 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.base.Strings;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import java.io.ByteArrayInputStream;
+import com.google.common.collect.Maps;
+
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
@@ -74,15 +63,12 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
-import javax.annotation.Nullable;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
@@ -96,17 +82,15 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -142,7 +126,6 @@ import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -175,484 +158,17 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class BigQueryIOTest implements Serializable {
 
-  // Status.UNKNOWN maps to null
-  private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
-      Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
-      Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
-
-
-  private static class FakeBigQueryServices implements BigQueryServices {
-
-    private String[] jsonTableRowReturns = new String[0];
-    private JobService jobService;
-    private DatasetService datasetService;
-
-    public FakeBigQueryServices withJobService(JobService jobService) {
-      this.jobService = jobService;
-      return this;
-    }
-
-    public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
-      this.datasetService = datasetService;
-      return this;
-    }
-
-    public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
-      this.jsonTableRowReturns = jsonTableRowReturns;
-      return this;
-    }
-
-    @Override
-    public JobService getJobService(BigQueryOptions bqOptions) {
-      return jobService;
-    }
-
-    @Override
-    public DatasetService getDatasetService(BigQueryOptions bqOptions) {
-      return datasetService;
-    }
-
-    @Override
-    public BigQueryJsonReader getReaderFromTable(
-        BigQueryOptions bqOptions, TableReference tableRef) {
-      return new FakeBigQueryReader(jsonTableRowReturns);
-    }
-
-    @Override
-    public BigQueryJsonReader getReaderFromQuery(
-        BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
-      return new FakeBigQueryReader(jsonTableRowReturns);
-    }
-
-    private static class FakeBigQueryReader implements BigQueryJsonReader {
-      private static final int UNSTARTED = -1;
-      private static final int CLOSED = Integer.MAX_VALUE;
-
-      private String[] jsonTableRowReturns;
-      private int currIndex;
-
-      FakeBigQueryReader(String[] jsonTableRowReturns) {
-        this.jsonTableRowReturns = jsonTableRowReturns;
-        this.currIndex = UNSTARTED;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        assertEquals(UNSTARTED, currIndex);
-        currIndex = 0;
-        return currIndex < jsonTableRowReturns.length;
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return ++currIndex < jsonTableRowReturns.length;
-      }
-
-      @Override
-      public TableRow getCurrent() throws NoSuchElementException {
-        if (currIndex >= jsonTableRowReturns.length) {
-          throw new NoSuchElementException();
-        }
-        return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
-      }
-
-      @Override
-      public void close() throws IOException {
-        currIndex = CLOSED;
-      }
-    }
-  }
-
-  private static class FakeJobService implements JobService, Serializable {
-
-    private Object[] startJobReturns;
-    private Object[] pollJobReturns;
-    private Object[] getJobReturns;
-    private String executingProject;
-    // Both counts will be reset back to zeros after serialization.
-    // This is a work around for DoFn's verifyUnmodified check.
-    private transient int startJobCallsCount;
-    private transient int pollJobStatusCallsCount;
-    private transient int getJobCallsCount;
-
-    public FakeJobService() {
-      this.startJobReturns = new Object[0];
-      this.pollJobReturns = new Object[0];
-      this.getJobReturns = new Object[0];
-      this.startJobCallsCount = 0;
-      this.pollJobStatusCallsCount = 0;
-      this.getJobCallsCount = 0;
-    }
-
-    /**
-     * Sets the return values to mock {@link JobService#startLoadJob},
-     * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
-     */
-    public FakeJobService startJobReturns(Object... startJobReturns) {
-      this.startJobReturns = startJobReturns;
-      return this;
-    }
-
-    /**
-     * Sets the return values to mock {@link JobService#getJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link InterruptedException}, returns otherwise.
-     */
-    public FakeJobService getJobReturns(Object... getJobReturns) {
-      this.getJobReturns = getJobReturns;
-      return this;
-    }
-
-    /**
-     * Sets the return values to mock {@link JobService#pollJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
-     */
-    public FakeJobService pollJobReturns(Object... pollJobReturns) {
-      this.pollJobReturns = pollJobReturns;
-      return this;
-    }
-
-    /**
-     * Verifies executing project.
-     */
-    public FakeJobService verifyExecutingProject(String executingProject) {
-      this.executingProject = executingProject;
-      return this;
-    }
-
-    @Override
-    public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
-        throws InterruptedException, IOException {
-      startJob(jobRef, loadConfig);
-    }
-
-    @Override
-    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
-        throws InterruptedException, IOException {
-      startJob(jobRef, extractConfig);
-    }
-
-    @Override
-    public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
-        throws IOException, InterruptedException {
-      startJob(jobRef, query);
-    }
-
-    @Override
-    public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
-        throws IOException, InterruptedException {
-      startJob(jobRef, copyConfig);
-    }
-
-    @Override
-    public Job pollJob(JobReference jobRef, int maxAttempts)
-        throws InterruptedException {
-      if (!Strings.isNullOrEmpty(executingProject)) {
-        checkArgument(
-            jobRef.getProjectId().equals(executingProject),
-            "Project id: %s is not equal to executing project: %s",
-            jobRef.getProjectId(), executingProject);
-      }
-
-      if (pollJobStatusCallsCount < pollJobReturns.length) {
-        Object ret = pollJobReturns[pollJobStatusCallsCount++];
-        if (ret instanceof Job) {
-          return (Job) ret;
-        } else if (ret instanceof Status) {
-          return JOB_STATUS_MAP.get(ret);
-        } else if (ret instanceof InterruptedException) {
-          throw (InterruptedException) ret;
-        } else {
-          throw new RuntimeException("Unexpected return type: " + ret.getClass());
-        }
-      } else {
-        throw new RuntimeException(
-            "Exceeded expected number of calls: " + pollJobReturns.length);
-      }
-    }
-
-    private void startJob(JobReference jobRef, GenericJson config)
-        throws IOException, InterruptedException {
-      if (!Strings.isNullOrEmpty(executingProject)) {
-        checkArgument(
-            jobRef.getProjectId().equals(executingProject),
-            "Project id: %s is not equal to executing project: %s",
-            jobRef.getProjectId(), executingProject);
-      }
-
-      if (startJobCallsCount < startJobReturns.length) {
-        Object ret = startJobReturns[startJobCallsCount++];
-        if (ret instanceof IOException) {
-          throw (IOException) ret;
-        } else if (ret instanceof InterruptedException) {
-          throw (InterruptedException) ret;
-        } else if (ret instanceof SerializableFunction) {
-          SerializableFunction<GenericJson, Void> fn =
-              (SerializableFunction<GenericJson, Void>) ret;
-          fn.apply(config);
-          return;
-        } else {
-          return;
-        }
-      } else {
-        throw new RuntimeException(
-            "Exceeded expected number of calls: " + startJobReturns.length);
-      }
-    }
-
-    @Override
-    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query)
-        throws InterruptedException, IOException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Job getJob(JobReference jobRef) throws InterruptedException {
-      if (!Strings.isNullOrEmpty(executingProject)) {
-        checkArgument(
-            jobRef.getProjectId().equals(executingProject),
-            "Project id: %s is not equal to executing project: %s",
-            jobRef.getProjectId(), executingProject);
-      }
-
-      if (getJobCallsCount < getJobReturns.length) {
-        Object ret = getJobReturns[getJobCallsCount++];
-        if (ret == null) {
-          return null;
-        } else if (ret instanceof Job) {
-          return (Job) ret;
-        } else if (ret instanceof InterruptedException) {
-          throw (InterruptedException) ret;
-        } else {
-          throw new RuntimeException("Unexpected return type: " + ret.getClass());
-        }
-      } else {
-        throw new RuntimeException(
-            "Exceeded expected number of calls: " + getJobReturns.length);
-      }
-    }
-
-    ////////////////////////////////// SERIALIZATION METHODS ////////////////////////////////////
-    private void writeObject(ObjectOutputStream out) throws IOException {
-      out.writeObject(replaceJobsWithBytes(startJobReturns));
-      out.writeObject(replaceJobsWithBytes(pollJobReturns));
-      out.writeObject(replaceJobsWithBytes(getJobReturns));
-      out.writeObject(executingProject);
-    }
-
-    private Object[] replaceJobsWithBytes(Object[] objs) {
-      Object[] copy = Arrays.copyOf(objs, objs.length);
-      for (int i = 0; i < copy.length; i++) {
-        checkArgument(
-            copy[i] == null || copy[i] instanceof Serializable || copy[i] instanceof Job,
-            "Only serializable elements and jobs can be added add to Job Returns");
-        if (copy[i] instanceof Job) {
-          try {
-            // Job is not serializable, so encode the job as a byte array.
-            copy[i] = Transport.getJsonFactory().toByteArray(copy[i]);
-          } catch (IOException e) {
-            throw new IllegalArgumentException(
-                String.format("Could not encode Job %s via available JSON factory", copy[i]));
-          }
-        }
-      }
-      return copy;
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-      this.startJobReturns = replaceBytesWithJobs(in.readObject());
-      this.pollJobReturns = replaceBytesWithJobs(in.readObject());
-      this.getJobReturns = replaceBytesWithJobs(in.readObject());
-      this.executingProject = (String) in.readObject();
-    }
-
-    private Object[] replaceBytesWithJobs(Object obj) throws IOException {
-      checkState(obj instanceof Object[]);
-      Object[] objs = (Object[]) obj;
-      Object[] copy = Arrays.copyOf(objs, objs.length);
-      for (int i = 0; i < copy.length; i++) {
-        if (copy[i] instanceof byte[]) {
-          Job job = Transport.getJsonFactory()
-              .createJsonParser(new ByteArrayInputStream((byte[]) copy[i]))
-              .parse(Job.class);
-          copy[i] = job;
-        }
-      }
-      return copy;
-    }
-  }
-
-  private static class TableContainer {
-    Table table;
-    List<TableRow> rows;
-    List<String> ids;
-
-    TableContainer(Table table) {
-      this.table = table;
-      this.rows = new ArrayList<>();
-      this.ids = new ArrayList<>();
-    }
-
-    TableContainer addRow(TableRow row, String id) {
-      rows.add(row);
-      ids.add(id);
-      return this;
-    }
-
-    Table getTable() {
-      return table;
-    }
-
-     List<TableRow> getRows() {
-      return rows;
-    }
-  }
-
   // Table information must be static, as each ParDo will get a separate instance of
   // FakeDatasetServices, and they must all modify the same storage.
-  private static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
+  static com.google.common.collect.Table<String, String, Map<String, TableContainer>>
       tables = HashBasedTable.create();
 
-  /** A fake dataset service that can be serialized, for use in testReadFromTable. */
-  private static class FakeDatasetService implements DatasetService, Serializable {
-    @Override
-    public Table getTable(TableReference tableRef)
-        throws InterruptedException, IOException {
-      synchronized (tables) {
-        Map<String, TableContainer> dataset =
-            checkNotNull(
-                tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
-                "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-                tableRef.getProjectId(),
-                tableRef.getDatasetId(),
-                tableRef.getTableId(),
-                FakeDatasetService.class.getSimpleName());
-        TableContainer tableContainer = dataset.get(tableRef.getTableId());
-        return tableContainer == null ? null : tableContainer.getTable();
-      }
-    }
-
-    public List<TableRow> getAllRows(String projectId, String datasetId, String tableId)
-        throws InterruptedException, IOException {
-      synchronized (tables) {
-        return getTableContainer(projectId, datasetId, tableId).getRows();
-      }
-    }
-
-    private TableContainer getTableContainer(String projectId, String datasetId, String tableId)
-            throws InterruptedException, IOException {
-       synchronized (tables) {
-         Map<String, TableContainer> dataset =
-             checkNotNull(
-                 tables.get(projectId, datasetId),
-                 "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-                 projectId,
-                 datasetId,
-                 FakeDatasetService.class.getSimpleName());
-         return checkNotNull(dataset.get(tableId),
-             "Tried to get a table %s:%s.%s from %s, but no such table was set",
-             projectId,
-             datasetId,
-             tableId,
-             FakeDatasetService.class.getSimpleName());
-       }
-    }
-
-    @Override
-    public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
-    }
-
-
-    @Override
-    public void createTable(Table table) throws IOException {
-      TableReference tableReference = table.getTableReference();
-      synchronized (tables) {
-        Map<String, TableContainer> dataset =
-            checkNotNull(
-                tables.get(tableReference.getProjectId(), tableReference.getDatasetId()),
-                "Tried to get a dataset %s:%s from %s, but no such table was set",
-                tableReference.getProjectId(),
-                tableReference.getDatasetId(),
-                FakeDatasetService.class.getSimpleName());
-        TableContainer tableContainer = dataset.get(tableReference.getTableId());
-        if (tableContainer == null) {
-          tableContainer = new TableContainer(table);
-          dataset.put(tableReference.getTableId(), tableContainer);
-        }
-      }
-    }
-
-    @Override
-    public boolean isTableEmpty(TableReference tableRef)
-        throws IOException, InterruptedException {
-      Long numBytes = getTable(tableRef).getNumBytes();
-      return numBytes == null || numBytes == 0L;
-    }
-
-    @Override
-    public Dataset getDataset(
-        String projectId, String datasetId) throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public void createDataset(
-        String projectId, String datasetId, String location, String description)
-        throws IOException, InterruptedException {
-      synchronized (tables) {
-        Map<String, TableContainer> dataset = tables.get(projectId, datasetId);
-        if (dataset == null) {
-          dataset = new HashMap<>();
-          tables.put(projectId, datasetId, dataset);
-        }
-      }
-    }
-
-    @Override
-    public void deleteDataset(String projectId, String datasetId)
-        throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
-    }
-
-    @Override
-    public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
-        throws IOException, InterruptedException {
-      synchronized (tables) {
-        assertEquals(rowList.size(), insertIdList.size());
-
-        long dataSize = 0;
-        TableContainer tableContainer = getTableContainer(
-            ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-        for (int i = 0; i < rowList.size(); ++i) {
-          System.out.println("adding row " + rowList.get(i));
-          tableContainer.addRow(rowList.get(i), insertIdList.get(i));
-          dataSize += rowList.get(i).toString().length();
-        }
-        return dataSize;
-      }
-    }
-
-    @Override
-    public Table patchTableDescription(TableReference tableReference,
-                                       @Nullable String tableDescription)
-        throws IOException, InterruptedException {
-      throw new UnsupportedOperationException("Unsupported");
-    }
-  }
-
   @Rule public final transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   @Rule public transient ExpectedLogs loggedBigQueryIO = ExpectedLogs.none(BigQueryIO.class);
   @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class);
   @Rule public transient ExpectedLogs loggedWriteTables = ExpectedLogs.none(WriteTables.class);
   @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
-  @Mock(extraInterfaces = Serializable.class)
-  public transient BigQueryServices.JobService mockJobService;
   @Mock private transient IOChannelFactory mockIOChannelFactory;
   @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
 
@@ -801,7 +317,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBuildSourceWithTableAndFlatten() {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation("gs://testbucket/testdir");
 
     Pipeline p = TestPipeline.create(bqOptions);
@@ -819,7 +335,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBuildSourceWithTableAndFlattenWithoutValidation() {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation("gs://testbucket/testdir");
 
     Pipeline p = TestPipeline.create(bqOptions);
@@ -838,7 +354,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBuildSourceWithTableAndSqlDialect() {
     BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation("gs://testbucket/testdir");
 
     Pipeline p = TestPipeline.create(bqOptions);
@@ -856,7 +372,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testReadFromTable() throws IOException, InterruptedException {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     Job job = new Job();
@@ -906,11 +422,11 @@ public class BigQueryIOTest implements Serializable {
         new WriteExtractFiles(schemaGenerator, records);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns(onStartJob, "done")
-            .pollJobReturns(job)
-            .getJobReturns((Job) null)
-            .verifyExecutingProject(bqOptions.getProject()))
+        .withJobService(new FakeJobService())
+           // .startJobReturns(onStartJob, "done")
+          //  .pollJobReturns(job)
+         //   .getJobReturns((Job) null)
+          //  .verifyExecutingProject(bqOptions.getProject()))
         .withDatasetService(fakeDatasetService)
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", 1)),
@@ -938,13 +454,16 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWrite() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done", "done")
-            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
+        .withJobService(new FakeJobService())
+        //    .startJobReturns("done", "done", "done")
+        //    .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED))
+        .withDatasetService(mockDatasetService);
+
+    mockDatasetService.createDataset("defaultproject", "dataset-id", "", "");
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -969,7 +488,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testStreamingWrite() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeDatasetService datasetService = new FakeDatasetService();
@@ -1095,15 +614,27 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testStreamingWriteWithWindowFn() throws Exception {
+  @Category(NeedsRunner.class)
+  public void testStreamingWriteWithDynamicTables() throws Exception {
+    testWriteWithDynamicTables(true);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testBatchWriteWithDynamicTables() throws Exception {
+    testWriteWithDynamicTables(false);
+  }
+
+  public void testWriteWithDynamicTables(boolean streaming) throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeDatasetService datasetService = new FakeDatasetService();
     datasetService.createDataset("project-id", "dataset-id", "", "");
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withDatasetService(datasetService);
+        .withDatasetService(datasetService)
+        .withJobService(new FakeJobService());
 
     List<Integer> inserts = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
@@ -1134,9 +665,11 @@ public class BigQueryIOTest implements Serializable {
     };
 
     Pipeline p = TestPipeline.create(bqOptions);
-    p.apply(Create.of(inserts))
-        .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
-        .apply(Window.<Integer>into(window))
+    PCollection<Integer> input = p.apply(Create.of(inserts));
+    if (streaming) {
+      input = input.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+    }
+    input.apply(Window.<Integer>into(window))
         .apply(BigQueryIO.<Integer>write()
             .to(tableFunction)
             .withFormatFunction(new SerializableFunction<Integer, TableRow>() {
@@ -1179,13 +712,13 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWriteUnknown() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done")
-            .pollJobReturns(Status.FAILED, Status.UNKNOWN));
+        .withJobService(new FakeJobService());
+       //     .startJobReturns("done", "done")
+        //    .pollJobReturns(Status.FAILED, Status.UNKNOWN));
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -1211,13 +744,13 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWriteFailedJobs() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done", "done")
-            .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
+        .withJobService(new FakeJobService());
+         //   .startJobReturns("done", "done", "done")
+         //   .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED));
 
     Pipeline p = TestPipeline.create(bqOptions);
     p.apply(Create.of(
@@ -1285,7 +818,7 @@ public class BigQueryIOTest implements Serializable {
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
-            .withJobService(mockJobService))
+            .withJobService(new FakeJobService()))
         .withoutValidation();
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -1301,7 +834,7 @@ public class BigQueryIOTest implements Serializable {
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
-            .withJobService(mockJobService))
+            .withJobService(new FakeJobService()))
         .withoutValidation();
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -1342,7 +875,7 @@ public class BigQueryIOTest implements Serializable {
         .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
         .withTestServices(new FakeBigQueryServices()
           .withDatasetService(mockDatasetService)
-          .withJobService(mockJobService))
+          .withJobService(new FakeJobService()))
         .withoutValidation();
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
@@ -1506,7 +1039,7 @@ public class BigQueryIOTest implements Serializable {
     options.setProject(projectId);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
+        .withJobService(new FakeJobService())
         .withDatasetService(mockDatasetService);
     when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
         new RuntimeException("Unable to confirm BigQuery dataset presence"));
@@ -1674,7 +1207,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
+        .withJobService(new FakeJobService())
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", "1")),
             toJsonString(new TableRow().set("name", "b").set("number", "2")),
@@ -1712,7 +1245,7 @@ public class BigQueryIOTest implements Serializable {
         .setStatistics(jobStats);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
+        .withJobService(new FakeJobService())
         .withDatasetService(mockDatasetService)
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", "1")),
@@ -1731,8 +1264,6 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "b").set("number", "2"),
         new TableRow().set("name", "c").set("number", "3"));
 
-    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation("mock://tempLocation");
 
@@ -1752,9 +1283,6 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(1, sources.size());
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
-    Mockito.verify(mockJobService)
-        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
   }
 
   @Test
@@ -1777,8 +1305,9 @@ public class BigQueryIOTest implements Serializable {
     extractJob.setStatus(new JobStatus())
         .setStatistics(extractJobStats);
 
+    FakeJobService fakeJobService = new FakeJobService();
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
+        .withJobService(fakeJobService)
         .withDatasetService(mockDatasetService)
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", "1")),
@@ -1803,23 +1332,29 @@ public class BigQueryIOTest implements Serializable {
     options.setTempLocation(extractDestinationDir);
 
     TableReference queryTable = new TableReference()
-        .setProjectId("testProejct")
+        .setProjectId("testproject")
         .setDatasetId("testDataset")
         .setTableId("testTable");
-    when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
-        .thenReturn(new JobStatistics().setQuery(
+  //  when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
+     //   .thenReturn(new JobStatistics().setQuery(
+     //       new JobStatistics2()
+     //           .setTotalBytesProcessed(100L)
+     //           .setReferencedTables(ImmutableList.of(queryTable))));
+    fakeJobService.expectDryRunQuery("testproject", "query",
+        new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)
                 .setReferencedTables(ImmutableList.of(queryTable))));
-    when(mockDatasetService.getTable(eq(queryTable)))
-        .thenReturn(new Table().setSchema(new TableSchema()));
-    when(mockDatasetService.getTable(eq(destinationTable)))
-        .thenReturn(new Table().setSchema(new TableSchema()));
+
+   // when(mockDatasetService.getTable(eq(queryTable)))
+     //   .thenReturn(new Table().setSchema(new TableSchema()));
+   // when(mockDatasetService.getTable(eq(destinationTable)))
+    //    .thenReturn(new Table().setSchema(new TableSchema()));
     IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
-    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);
+    //when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+    //    .thenReturn(extractJob);
 
     Assert.assertThat(
         SourceTestUtils.readFromSource(bqSource, options),
@@ -1832,6 +1367,7 @@ public class BigQueryIOTest implements Serializable {
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
 
+    /*
     Mockito.verify(mockJobService)
         .startQueryJob(
             Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
@@ -1843,7 +1379,7 @@ public class BigQueryIOTest implements Serializable {
         ArgumentCaptor.forClass(JobConfigurationQuery.class);
     Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
     assertEquals(true, queryConfigArg.getValue().getFlattenResults());
-    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
+    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
   }
 
   @Test
@@ -1867,7 +1403,7 @@ public class BigQueryIOTest implements Serializable {
         .setStatistics(extractJobStats);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
+        .withJobService(new FakeJobService())
         .withDatasetService(mockDatasetService)
         .readerReturns(
             toJsonString(new TableRow().set("name", "a").set("number", "1")),
@@ -1891,17 +1427,18 @@ public class BigQueryIOTest implements Serializable {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setTempLocation(extractDestinationDir);
 
+    /*
     when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
         .thenReturn(new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)));
     when(mockDatasetService.getTable(eq(destinationTable)))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
+    IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
     when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);
+        .thenReturn(extractJob);*/
 
     Assert.assertThat(
         SourceTestUtils.readFromSource(bqSource, options),
@@ -1914,7 +1451,8 @@ public class BigQueryIOTest implements Serializable {
     BoundedSource<TableRow> actual = sources.get(0);
     assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
 
-    Mockito.verify(mockJobService)
+    /*
+    Mockito.verify(Service)
         .startQueryJob(
             Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
     Mockito.verify(mockJobService)
@@ -1925,7 +1463,7 @@ public class BigQueryIOTest implements Serializable {
         ArgumentCaptor.forClass(JobConfigurationQuery.class);
     Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
     assertEquals(true, queryConfigArg.getValue().getFlattenResults());
-    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
+    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());*/
   }
 
   @Test
@@ -2028,7 +1566,7 @@ public class BigQueryIOTest implements Serializable {
 
     // An empty file is created for no input data. One partition is needed.
     long expectedNumPartitions = 1;
-    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+    testWritePartition(1, numFiles, fileSize, expectedNumPartitions);
   }
 
   @Test
@@ -2038,7 +1576,7 @@ public class BigQueryIOTest implements Serializable {
 
     // One partition is needed.
     long expectedNumPartitions = 1;
-    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+    testWritePartition(2, numFiles, fileSize, expectedNumPartitions);
   }
 
   @Test
@@ -2048,7 +1586,7 @@ public class BigQueryIOTest implements Serializable {
 
     // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
     long expectedNumPartitions = 3;
-    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+    testWritePartition(2, numFiles, fileSize, expectedNumPartitions);
   }
 
   @Test
@@ -2058,69 +1596,103 @@ public class BigQueryIOTest implements Serializable {
 
     // One partition is needed for each group of three files.
     long expectedNumPartitions = 4;
-    testWritePartition(numFiles, fileSize, expectedNumPartitions);
+    testWritePartition(2, numFiles, fileSize, expectedNumPartitions);
   }
 
-  private void testWritePartition(long numFiles, long fileSize, long expectedNumPartitions)
+  private void testWritePartition(long numTables, long numFilesPerTable, long fileSize,
+                                  long expectedNumPartitionsPerTable)
       throws Exception {
     p.enableAbandonedNodeEnforcement(false);
 
-    List<Long> expectedPartitionIds = Lists.newArrayList();
-    for (long i = 1; i <= expectedNumPartitions; ++i) {
-      expectedPartitionIds.add(i);
+    List<ShardedKey<TableDestination>> expectedPartitions = Lists.newArrayList();
+    for (int i = 0; i < numTables; ++i) {
+      for (int j = 1; j <= expectedNumPartitionsPerTable; ++j) {
+        String tableName = String.format("project-id:dataset-id.tables%05d", i);
+        TableDestination destination = new TableDestination(tableName, tableName);
+        expectedPartitions.add(ShardedKey.of(destination, j));
+      }
     }
 
-    List<KV<String, Long>> files = Lists.newArrayList();
-    List<String> fileNames = Lists.newArrayList();
-    for (int i = 0; i < numFiles; ++i) {
-      String fileName = String.format("files%05d", i);
-      fileNames.add(fileName);
-      files.add(KV.of(fileName, fileSize));
+    List<WriteBundlesToFiles.Result> files = Lists.newArrayList();
+    Map<TableDestination, List<String>> filenamesPerTable = Maps.newHashMap();
+    for (int i = 0; i < numTables; ++i) {
+      String tableName = String.format("project-id:dataset-id.tables%05d", i);
+      TableDestination destination = new TableDestination(tableName, tableName);
+      List<String> filenames = filenamesPerTable.get(destination);
+      if (filenames == null) {
+        filenames = Lists.newArrayList();
+        filenamesPerTable.put(destination, filenames);
+      }
+      for (int j = 0; j < numFilesPerTable; ++j) {
+        String fileName = String.format("%s_files%05d", tableName, j);
+        filenames.add(fileName);
+        files.add(new Result(fileName, fileSize, destination));
+      }
     }
 
-    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<KV<TableDestination, Integer>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<KV<TableDestination, Integer>, List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
 
     PCollectionView<Iterable<WriteBundlesToFiles.Result>> resultsView =
         PCollectionViews.iterableView(
         p,
         WindowingStrategy.globalDefault(),
-        KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()));
+        WriteBundlesToFiles.ResultCoder.of());
 
+    ValueProvider<String> singletonTable = null;
+    if (numFilesPerTable == 0 && numTables == 1) {
+      TableReference singletonReference = new TableReference()
+          .setProjectId("projectid")
+          .setDatasetId("dataset")
+          .setTableId("table");
+      singletonTable = StaticValueProvider.of(BigQueryHelpers.toJsonString(singletonReference));
+    }
     WritePartition writePartition =
-        new WritePartition(null, null, resultsView,
+        new WritePartition(singletonTable,
+            "singleton", resultsView,
             multiPartitionsTag, singlePartitionTag);
 
-    DoFnTester<String, KV<KV<TableDestination, Integer>, List<String>>> tester =
+    DoFnTester<String, KV<ShardedKey<TableDestination>, List<String>>> tester =
         DoFnTester.of(writePartition);
     tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
     tester.processElement(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
-    List<KV<KV<TableDestination, Integer>, List<String>>> partitions;
-    if (expectedNumPartitions > 1) {
+    List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
+    if (expectedNumPartitionsPerTable > 1) {
       partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {
       partitions = tester.takeOutputElements(singlePartitionTag);
     }
-    List<Long> partitionIds = Lists.newArrayList();
-    List<String> partitionFileNames = Lists.newArrayList();
-    for (KV<Long, List<String>> partition : partitions) {
-      partitionIds.add(partition.getKey());
-      for (String name : partition.getValue()) {
-        partitionFileNames.add(name);
+
+
+    List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList();
+    Map<TableDestination, List<String>> filesPerTableResult = Maps.newHashMap();
+    for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) {
+      TableDestination table = partition.getKey().getKey();
+      partitionsResult.add(partition.getKey());
+      List<String> tableFilesResult = filesPerTableResult.get(table);
+      if (tableFilesResult == null) {
+        tableFilesResult = Lists.newArrayList();
+        filesPerTableResult.put(table, tableFilesResult);
       }
+      tableFilesResult.addAll(partition.getValue());
     }
 
-    assertEquals(expectedPartitionIds, partitionIds);
-    if (numFiles == 0) {
-      assertThat(partitionFileNames, Matchers.hasSize(1));
-      assertTrue(Files.exists(Paths.get(partitionFileNames.get(0))));
-      assertThat(Files.readAllBytes(Paths.get(partitionFileNames.get(0))).length,
+    assertEquals(expectedPartitions.size(), partitionsResult.size());
+
+   // assertThat(partitionsResult,
+     //   containsInAnyOrder(Iterables.toArray(expectedPartitions, ShardedKey.class)));
+
+    if (numFilesPerTable == 0 && numTables == 1) {
+      assertEquals(1, filesPerTableResult.size());
+      List<String> singletonFiles = filesPerTableResult.values().iterator().next();
+      assertTrue(Files.exists(Paths.get(singletonFiles.get(0))));
+      assertThat(Files.readAllBytes(Paths.get(singletonFiles.get(0))).length,
           Matchers.equalTo(0));
     } else {
-      assertEquals(fileNames, partitionFileNames);
+      assertEquals(filenamesPerTable, filesPerTableResult);
     }
   }
 
@@ -2129,26 +1701,46 @@ public class BigQueryIOTest implements Serializable {
     p.enableAbandonedNodeEnforcement(false);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done", "done", "done")
-            .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED));
+        .withJobService(new FakeJobService())
+        //    .startJobReturns("done", "done", "done", "done", "done", "done", "done", "done",
+       //         "done", "done")
+       //     .pollJobReturns(Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
+       //         Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED,
+      //          Status.SUCCEEDED, Status.SUCCEEDED))
+        .withDatasetService(mockDatasetService);
 
+    long numTables = 3;
     long numPartitions = 3;
     long numFilesPerPartition = 10;
     String jobIdToken = "jobIdToken";
     String tempFilePrefix = "tempFilePrefix";
-    String jsonTable = "{}";
-    String jsonSchema = "{}";
-    List<String> expectedTempTables = Lists.newArrayList();
-
-    List<KV<Long, Iterable<List<String>>>> partitions = Lists.newArrayList();
-    for (long i = 0; i < numPartitions; ++i) {
-      List<String> filesPerPartition = Lists.newArrayList();
-      for (int j = 0; j < numFilesPerPartition; ++j) {
-        filesPerPartition.add(String.format("files%05d", j));
+    Map<TableDestination, List<String>> expectedTempTables = Maps.newHashMap();
+
+    List<KV<ShardedKey<TableDestination>, Iterable<List<String>>>> partitions =
+        Lists.newArrayList();
+    for (int i = 0; i < numTables; ++i) {
+      String tableName = String.format("project-id:dataset-id.table%05d", i);
+      TableDestination tableDestination = new TableDestination(tableName, tableName);
+      for (int j = 0; j < numPartitions; ++j) {
+        String tempTableId = String.format(
+            jobIdToken + "_0x%08x_%05d", tableDestination.hashCode(), j);
+        List<String> filesPerPartition = Lists.newArrayList();
+        for (int k = 0; k < numFilesPerPartition; ++k) {
+          filesPerPartition.add(String.format("files0x%08x_%05d", tableDestination.hashCode(), k));
+        }
+        partitions.add(KV.of(ShardedKey.of(tableDestination, j),
+            (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
+
+        List<String> expectedTables = expectedTempTables.get(tableDestination);
+        if (expectedTables == null) {
+          expectedTables = Lists.newArrayList();
+          expectedTempTables.put(tableDestination, expectedTables);
+        }
+        String json = String.format(
+            "{\"datasetId\":\"dataset-id\",\"projectId\":\"project-id\",\"tableId\":\"%s\"}",
+            tempTableId);
+        expectedTables.add(json);
       }
-      partitions.add(KV.of(i, (Iterable<List<String>>) Collections.singleton(filesPerPartition)));
-      expectedTempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
     }
 
     PCollection<String> expectedTempTablesPCollection = p.apply(Create.of(expectedTempTables));
@@ -2165,27 +1757,33 @@ public class BigQueryIOTest implements Serializable {
         fakeBqServices,
         jobIdTokenView,
         tempFilePrefix,
-        StaticValueProvider.of(jsonTable),
-        StaticValueProvider.of(jsonSchema),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED,
         null);
 
-    DoFnTester<KV<Long, Iterable<List<String>>>, String> tester = DoFnTester.of(writeTables);
+    DoFnTester<KV<ShardedKey<TableDestination>, Iterable<List<String>>>,
+        KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    for (KV<Long, Iterable<List<String>>> partition : partitions) {
+    for (KV<ShardedKey<TableDestination>, Iterable<List<String>>> partition : partitions) {
       tester.processElement(partition);
     }
 
-    List<String> tempTables = tester.takeOutputElements();
-
-    assertEquals(expectedTempTables, tempTables);
+    Map<TableDestination, List<String>> tempTablesResult = Maps.newHashMap();
+    for (KV<TableDestination, String> element : tester.takeOutputElements()) {
+      List<String> tables = tempTablesResult.get(element.getKey());
+      if (tables == null) {
+        tables = Lists.newArrayList();
+        tempTablesResult.put(element.getKey(), tables);
+      }
+      tables.add(element.getValue());
+    }
+    assertEquals(expectedTempTables, tempTablesResult);
   }
 
   @Test
   public void testRemoveTemporaryFiles() throws Exception {
     BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
+    bqOptions.setProject("defaultproject");
     bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
 
     int numFiles = 10;
@@ -2195,7 +1793,7 @@ public class BigQueryIOTest implements Serializable {
     for (int i = 0; i < numFiles; ++i) {
       String fileName = String.format("files%05d", i);
       writer.open(fileName);
-      fileNames.add(writer.close().getKey());
+      fileNames.add(writer.close().filename);
     }
     fileNames.add(tempFilePrefix + String.format("files%05d", numFiles));
 
@@ -2217,23 +1815,33 @@ public class BigQueryIOTest implements Serializable {
     p.enableAbandonedNodeEnforcement(false);
 
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done")
-            .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
+        .withJobService(new FakeJobService())
+         //   .startJobReturns("done", "done")
+        //    .pollJobReturns(Status.FAILED, Status.SUCCEEDED))
         .withDatasetService(mockDatasetService);
 
-    long numTempTables = 3;
+    int numFinalTables = 3;
+    int numTempTables = 3;
     String jobIdToken = "jobIdToken";
     String jsonTable = "{}";
-    List<String> tempTables = Lists.newArrayList();
-    for (long i = 0; i < numTempTables; ++i) {
-      tempTables.add(String.format("{\"tableId\":\"%s_%05d\"}", jobIdToken, i));
+    Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap();
+    for (int i = 0; i < numFinalTables; ++i) {
+      String tableName = "project-id:dataset-id.table_" + i;
+      TableDestination tableDestination = new TableDestination(tableName, tableName);
+      List<String> tables = Lists.newArrayList();
+      tempTables.put(tableDestination, tables);
+      for (int j = 0; i < numTempTables; ++i) {
+        tables.add(String.format(
+            "{\"project-id:dataset-id.tableId\":\"%s_%05d_%05d\"}", jobIdToken, i, j));
+      }
     }
 
-    PCollection<String> tempTablesPCollection = p.apply(Create.of(tempTables));
-    PCollectionView<Iterable<String>> tempTablesView =
-        PCollectionViews.iterableView(
-            tempTablesPCollection, WindowingStrategy.globalDefault(), StringUtf8Coder.of());
+    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
+        PCollectionViews.multimapView(
+        p,
+        WindowingStrategy.globalDefault(),
+        KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of()));
+
     PCollection<String> jobIdTokenCollection = p.apply("CreateJobId", Create.of("jobId"));
     PCollectionView<String> jobIdTokenView =
         jobIdTokenCollection.apply(View.<String>asSingleton());
@@ -2241,11 +1849,9 @@ public class BigQueryIOTest implements Serializable {
     WriteRename writeRename = new WriteRename(
         fakeBqServices,
         jobIdTokenView,
-        StaticValueProvider.of(jsonTable),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED,
-        tempTablesView,
-        null);
+        tempTablesView);
 
     DoFnTester<String, Void> tester = DoFnTester.of(writeRename);
     tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);

http://git-wip-us.apache.org/repos/asf/beam/blob/760a9458/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
new file mode 100644
index 0000000..ed3ab37
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeBigQueryServices.java
@@ -0,0 +1,96 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.options.BigQueryOptions;
+
+
+/**
+ * Created by relax on 3/30/17.
+ */
+class FakeBigQueryServices implements BigQueryServices {
+  private String[] jsonTableRowReturns = new String[0];
+  private JobService jobService;
+  private DatasetService datasetService;
+
+  public FakeBigQueryServices withJobService(JobService jobService) {
+    this.jobService = jobService;
+    return this;
+  }
+
+  public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
+    this.datasetService = datasetService;
+    return this;
+  }
+
+  public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
+    this.jsonTableRowReturns = jsonTableRowReturns;
+    return this;
+  }
+
+  @Override
+  public JobService getJobService(BigQueryOptions bqOptions) {
+    return jobService;
+  }
+
+  @Override
+  public DatasetService getDatasetService(BigQueryOptions bqOptions) {
+    return datasetService;
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromTable(
+      BigQueryOptions bqOptions, TableReference tableRef) {
+    return new FakeBigQueryReader(jsonTableRowReturns);
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromQuery(
+      BigQueryOptions bqOptions, String projectId, JobConfigurationQuery queryConfig) {
+    return new FakeBigQueryReader(jsonTableRowReturns);
+  }
+
+  private static class FakeBigQueryReader implements BigQueryJsonReader {
+    private static final int UNSTARTED = -1;
+    private static final int CLOSED = Integer.MAX_VALUE;
+
+    private String[] jsonTableRowReturns;
+    private int currIndex;
+
+    FakeBigQueryReader(String[] jsonTableRowReturns) {
+      this.jsonTableRowReturns = jsonTableRowReturns;
+      this.currIndex = UNSTARTED;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      assertEquals(UNSTARTED, currIndex);
+      currIndex = 0;
+      return currIndex < jsonTableRowReturns.length;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      return ++currIndex < jsonTableRowReturns.length;
+    }
+
+    @Override
+    public TableRow getCurrent() throws NoSuchElementException {
+      if (currIndex >= jsonTableRowReturns.length) {
+        throw new NoSuchElementException();
+      }
+      return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
+    }
+
+    @Override
+    public void close() throws IOException {
+      currIndex = CLOSED;
+    }
+  }
+}