You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 05:11:03 UTC

[04/10] beam git commit: Refactor streaming write branch into separate reusable components.

Refactor streaming write branch into separate reusable components.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58ed5c7e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58ed5c7e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58ed5c7e

Branch: refs/heads/master
Commit: 58ed5c7ecd247f9c5e5a15deff40ffa8c800af25
Parents: 67a5f82
Author: Reuven Lax <re...@google.com>
Authored: Tue Mar 28 19:34:56 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:50 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  69 ++++++------
 .../beam/sdk/io/gcp/bigquery/CreateTables.java  | 100 +++++++++++++++++
 .../io/gcp/bigquery/GenerateShardedTable.java   |  48 ++++++++
 .../beam/sdk/io/gcp/bigquery/PrepareWrite.java  |  65 ++++++-----
 .../sdk/io/gcp/bigquery/StreamWithDeDup.java    |  90 ---------------
 .../sdk/io/gcp/bigquery/StreamingInserts.java   | 110 +++++++++++++++++++
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |  82 +-------------
 .../sdk/io/gcp/bigquery/TableDestination.java   |  48 +++++++-
 .../io/gcp/bigquery/TableDestinationCoder.java  |  64 +++++++++++
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |  71 ++++++++++++
 .../gcp/bigquery/TagWithUniqueIdsAndTable.java  | 101 -----------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  18 +--
 12 files changed, 521 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index af0d561..af19b83 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,6 +40,7 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
@@ -67,6 +69,7 @@ import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -681,8 +684,8 @@ public class BigQueryIO {
     static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
     @Nullable abstract ValueProvider<String> getJsonTableRef();
-    @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableReference>
-      getTableRefFunction();
+    @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
+      getTableFunction();
     @Nullable abstract SerializableFunction<T, TableRow> getFormatFunction();
     /** Table schema. The schema is required only if the table does not exist. */
     @Nullable abstract ValueProvider<String> getJsonSchema();
@@ -783,7 +786,7 @@ public class BigQueryIO {
     private void ensureToNotCalledYet() {
       checkState(
           getJsonTableRef() == null && getTable() == null
-              && getTableRefFunction() == null, "to() already called");
+              && getTableFunction() == null, "to() already called");
     }
 
     /**
@@ -802,13 +805,16 @@ public class BigQueryIO {
     /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
     public Write<T> to(ValueProvider<String> tableSpec) {
       ensureToNotCalledYet();
+      String tableDescription = getTableDescription();
+      if (tableDescription == null) {
+        tableDescription = "";
+      }
       return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
                   new TableRefToJson()))
-          .setTableRefFunction(new TranslateTableSpecFunction<T>(
-              new ConstantTableSpecFunction<T>(tableSpec)))
+          .setTableFunction(new ConstantTableFunction<T>(tableSpec, tableDescription))
           .build();
     }
 
@@ -819,6 +825,8 @@ public class BigQueryIO {
     public Write<T> to(
         SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
       return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
+      ensureToNotCalledYet();
+      return toBuilder().setTableFunction(tableFunction).build();
     }
 
     /**
@@ -828,7 +836,7 @@ public class BigQueryIO {
     private Write<T> toTableReference(
         SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction) {
       ensureToNotCalledYet();
-      return toBuilder().setTableRefFunction(tableRefFunction).build();
+      return toBuilder().setTableFunction(tableFunction).build();
     }
 
     /**
@@ -838,32 +846,19 @@ public class BigQueryIO {
       return toBuilder().setFormatFunction(formatFunction).build();
     }
 
-    private static class TranslateTableSpecFunction<T> implements
-        SerializableFunction<ValueInSingleWindow<T>, TableReference> {
-      private SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction;
-
-      TranslateTableSpecFunction(
-          SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
-        this.tableSpecFunction = tableSpecFunction;
-      }
+    static class ConstantTableFunction<T> implements
+        SerializableFunction<ValueInSingleWindow<T>, TableDestination> {
+      private final ValueProvider<String> tableSpec;
+      private final String tableDescription;
 
-      @Override
-      public TableReference apply(ValueInSingleWindow<T> value) {
-        return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value));
-      }
-    }
-
-    static class ConstantTableSpecFunction<T> implements
-        SerializableFunction<ValueInSingleWindow<T>, String> {
-      private ValueProvider<String> tableSpec;
-
-      ConstantTableSpecFunction(ValueProvider<String> tableSpec) {
+      ConstantTableFunction(ValueProvider<String> tableSpec, String tableDescription) {
         this.tableSpec = tableSpec;
+        this.tableDescription = tableDescription;
       }
 
       @Override
-      public String apply(ValueInSingleWindow<T> value) {
-        return tableSpec.get();
+      public TableDestination apply(ValueInSingleWindow<T> value) {
+        return new TableDestination(tableSpec.get(), tableDescription);
       }
     }
 
@@ -919,7 +914,7 @@ public class BigQueryIO {
       BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
       // Exactly one of the table and table reference can be configured.
-      checkState(getTableRefFunction() != null,
+      checkState(getTableFunction() != null,
           "must set the table reference of a BigQueryIO.Write transform");
 
       checkArgument(getFormatFunction() != null,
@@ -978,10 +973,16 @@ public class BigQueryIO {
 
     @Override
     public WriteResult expand(PCollection<T> input) {
+      PCollection<KV<TableDestination, TableRow>> rowsWithDestination =
+          input.apply("PrepareWrite", ParDo.of(
+              new PrepareWrite<T>(getTableFunction(), getFormatFunction())))
+              .setCoder(KvCoder.of(TableDestinationCoder.of(), TableRowJsonCoder.of()));
+
+
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
-      // StreamWithDeDup and BigQuery's streaming import API.
+      // StreamingInserts and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED) {
-        return input.apply(new StreamWithDeDup<T>(this));
+        return rowsWithDestination.apply(new StreamingInserts(this));
       } else {
         return input.apply(new BatchLoadBigQuery<T>(this));
       }
@@ -1002,8 +1003,8 @@ public class BigQueryIO {
           .addIfNotNull(DisplayData.item("schema", getJsonSchema())
             .withLabel("Table Schema"));
 
-      if (getTableRefFunction() != null) {
-        builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
+      if (getTableFunction() != null) {
+        builder.add(DisplayData.item("tableFn", getTableFunction().getClass())
           .withLabel("Table Reference Function"));
       }
 
@@ -1025,7 +1026,7 @@ public class BigQueryIO {
     }
 
     /**
-     * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+     * Returns the table to write, or {@code null} if writing with {@code tableFunction}.
      *
      * <p>If the table's project is not specified, use the executing project.
      */
@@ -1066,7 +1067,7 @@ public class BigQueryIO {
    */
   @VisibleForTesting
   static void clearCreatedTables() {
-    StreamingWriteFn.clearCreatedTables();
+    CreateTables.clearCreatedTables();
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
new file mode 100644
index 0000000..e216553
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTables.java
@@ -0,0 +1,100 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+
+
+/**
+ * Creates any tables needed before performing streaming writes to the tables. This is a
+ * side-effect {l@ink DoFn}, and returns the original collection unchanged.
+ */
+public class CreateTables extends DoFn<KV<TableDestination, TableRow>,
+    KV<TableDestination, TableRow>> {
+  private final CreateDisposition createDisposition;
+  private final BigQueryServices bqServices;
+  private final SerializableFunction<TableDestination, TableSchema> schemaFunction;
+
+
+  /** The list of tables created so far, so we don't try the creation
+   each time.
+   * TODO: We should put a bound on memory usage of this. Use guava cache instead.
+   */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  public CreateTables(CreateDisposition createDisposition, BigQueryServices bqServices,
+                      SerializableFunction<TableDestination, TableSchema> schemaFunction) {
+    this.createDisposition = createDisposition;
+    this.bqServices = bqServices;
+    this.schemaFunction = schemaFunction;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext context) throws InterruptedException, IOException {
+    BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+    possibleCreateTable(options, context.element().getKey());
+    context.output(context.element());
+  }
+
+  private void possibleCreateTable(BigQueryOptions options, TableDestination tableDestination)
+      throws InterruptedException, IOException {
+    String tableSpec = tableDestination.getTableSpec();
+    TableReference tableReference = tableDestination.getTableReference();
+    String tableDescription = tableDestination.getTableDescription();
+    if (createDisposition != createDisposition.CREATE_NEVER
+        && !createdTables.contains(tableSpec)) {
+      synchronized (createdTables) {
+        // Another thread may have succeeded in creating the table in the meanwhile, so
+        // check again. This check isn't needed for correctness, but we add it to prevent
+        // every thread from attempting a create and overwhelming our BigQuery quota.
+        DatasetService datasetService = bqServices.getDatasetService(options);
+        if (!createdTables.contains(tableSpec)) {
+          TableSchema tableSchema = schemaFunction.apply(tableDestination);
+          if (datasetService.getTable(tableReference) == null) {
+            datasetService.createTable(
+                new Table()
+                    .setTableReference(tableReference)
+                    .setSchema(tableSchema)
+                    .setDescription(tableDescription));
+          }
+          createdTables.add(tableSpec);
+        }
+      }
+    }
+  }
+
+  static void clearCreatedTables() {
+    synchronized (createdTables) {
+      createdTables.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
new file mode 100644
index 0000000..da3a70a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/GenerateShardedTable.java
@@ -0,0 +1,48 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Given a write to a specific table, assign that to one of the
+ * {@link GenerateShardedTable#numShards} keys assigned to that table.
+ */
+class GenerateShardedTable extends DoFn<KV<TableDestination, TableRow>,
+    KV<ShardedKey<String>, TableRow>> {
+  private final int numShards;
+
+  GenerateShardedTable(int numShards) {
+    this.numShards = numShards;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+    ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
+    // We output on keys 0-50 to ensure that there's enough batching for
+    // BigQuery.
+    String tableSpec = context.element().getKey().getTableSpec();
+    context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, numShards)),
+        context.element().getValue()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
index 0c08e18..7712417 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PrepareWrite.java
@@ -1,3 +1,20 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import com.google.api.services.bigquery.model.TableReference;
@@ -6,8 +23,6 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
@@ -15,44 +30,38 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 
 /**
- * Prepare an input {@link PCollection<T>} for writing to BigQuery. Use the table-reference
+ * Prepare an input {@link PCollection} for writing to BigQuery. Use the table-reference
  * function to determine which tables each element is written to, and format the element into a
  * {@link TableRow} using the user-supplied format function.
  */
-public class PrepareWrite<T> extends PTransform<PCollection<T>, PCollection<KV<String, TableRow>>> {
-  private static final String NAME = "PrepareWrite";
-  private SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
+public class PrepareWrite<T> extends DoFn<T, KV<TableDestination, TableRow>> {
+  private SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
   private SerializableFunction<T, TableRow> formatFunction;
 
-  public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction,
+  public PrepareWrite(SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
                       SerializableFunction<T, TableRow> formatFunction) {
-    super(NAME);
-    this.tableRefFunction = tableRefFunction;
+    this.tableFunction = tableFunction;
     this.formatFunction = formatFunction;
   }
 
-  @Override
-  public PCollection<KV<String, TableRow>> expand(PCollection<T> input) {
-    PCollection<KV<String, TableRow>> elementsByTable =
-        input.apply(ParDo.of(new DoFn<T, KV<String, TableRow>>() {
-      @ProcessElement
-      public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
-        String tableSpec = tableSpecFromWindowedValue(
-            context.getPipelineOptions().as(BigQueryOptions.class),
-            ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
-        TableRow tableRow = formatFunction.apply(context.element());
-        context.output(KV.of(tableSpec, tableRow));
-      }
-    }));
-    return elementsByTable;
+  @ProcessElement
+  public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+    TableDestination tableDestination = tableSpecFromWindowedValue(
+        context.getPipelineOptions().as(BigQueryOptions.class),
+        ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
+    TableRow tableRow = formatFunction.apply(context.element());
+    context.output(KV.of(tableDestination, tableRow));
   }
 
-  private String tableSpecFromWindowedValue(BigQueryOptions options,
+  private TableDestination tableSpecFromWindowedValue(BigQueryOptions options,
                                             ValueInSingleWindow<T> value) {
-    TableReference table = tableRefFunction.apply(value);
-    if (Strings.isNullOrEmpty(table.getProjectId())) {
-      table.setProjectId(options.getProject());
+    TableDestination tableDestination = tableFunction.apply(value);
+    TableReference tableReference = tableDestination.getTableReference();
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      tableReference.setProjectId(options.getProject());
+      tableDestination = new TableDestination(tableReference,
+          tableDestination.getTableDescription());
     }
-    return BigQueryHelpers.toTableSpec(table);
+    return tableDestination;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
deleted file mode 100644
index 506a564..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableSchema;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
-* PTransform that performs streaming BigQuery write. To increase consistency,
-* it leverages BigQuery best effort de-dup mechanism.
- */
-class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
-  private final Write<T> write;
-
-  /** Constructor. */
-  StreamWithDeDup(Write<T> write) {
-    this.write = write;
-  }
-
-  @Override
-  protected Coder<Void> getDefaultOutputCoder() {
-    return VoidCoder.of();
-  }
-
-  @Override
-  public WriteResult expand(PCollection<T> input) {
-    // A naive implementation would be to simply stream data directly to BigQuery.
-    // However, this could occasionally lead to duplicated data, e.g., when
-    // a VM that runs this code is restarted and the code is re-run.
-
-    // The above risk is mitigated in this implementation by relying on
-    // BigQuery built-in best effort de-dup mechanism.
-
-    // To use this mechanism, each input TableRow is tagged with a generated
-    // unique id, which is then passed to BigQuery and used to ignore duplicates.
-
-    PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
-        input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
-            input.getPipeline().getOptions().as(BigQueryOptions.class), write)));
-
-    // To prevent having the same TableRow processed more than once with regenerated
-    // different unique ids, this implementation relies on "checkpointing", which is
-    // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
-    // performed by Reshuffle.
-    NestedValueProvider<TableSchema, String> schema =
-        write.getJsonSchema() == null
-            ? null
-            : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
-    tagged
-        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
-        .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-        .apply(
-            ParDo.of(
-                new StreamingWriteFn(
-                    schema,
-                    write.getCreateDisposition(),
-                    write.getTableDescription(),
-                    write.getBigQueryServices())));
-
-    return WriteResult.in(input.getPipeline());
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
new file mode 100644
index 0000000..37afbdf
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+* PTransform that performs streaming BigQuery write. To increase consistency,
+* it leverages BigQuery best effort de-dup mechanism.
+ */
+
+class StreamingInserts
+    extends PTransform<PCollection<KV<TableDestination, TableRow>>, WriteResult> {
+  private final Write<?> write;
+
+  private static class ConstantSchemaFunction implements
+      SerializableFunction<TableDestination, TableSchema> {
+    private final @Nullable String jsonSchema;
+
+    ConstantSchemaFunction(TableSchema schema) {
+      this.jsonSchema = BigQueryHelpers.toJsonString(schema);
+    }
+
+    @Override
+    @Nullable
+    public TableSchema apply(TableDestination table) {
+      return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+    }
+  }
+
+  /** Constructor. */
+  StreamingInserts(Write<?> write) {
+    this.write = write;
+  }
+
+  @Override
+  protected Coder<Void> getDefaultOutputCoder() {
+    return VoidCoder.of();
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<TableDestination, TableRow>> input) {
+    // Since BigQueryIO.java does not yet have support for per-table schemas, inject a constant
+    // schema function here. If no schema is specified, this function will return null.
+    SerializableFunction<TableDestination, TableSchema> schemaFunction =
+        new ConstantSchemaFunction(write.getSchema());
+
+    // A naive implementation would be to simply stream data directly to BigQuery.
+    // However, this could occasionally lead to duplicated data, e.g., when
+    // a VM that runs this code is restarted and the code is re-run.
+
+    // The above risk is mitigated in this implementation by relying on
+    // BigQuery built-in best effort de-dup mechanism.
+
+    // To use this mechanism, each input TableRow is tagged with a generated
+    // unique id, which is then passed to BigQuery and used to ignore duplicates.
+    PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input
+        .apply("CreateTables", ParDo.of(new CreateTables(write.getCreateDisposition(),
+            write.getBigQueryServices(), schemaFunction)))
+        // We create 50 keys per BigQuery table to generate output on. This is few enough that we
+        // get good batching into BigQuery's insert calls, and enough that we can max out the
+        // streaming insert quota.
+        .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable(50)))
+        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowJsonCoder.of()))
+        .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds()));
+
+    // To prevent having the same TableRow processed more than once with regenerated
+    // different unique ids, this implementation relies on "checkpointing", which is
+    // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
+    // performed by Reshuffle.
+    tagged
+        .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
+        .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
+        .apply("StreamingWrite",
+            ParDo.of(
+                new StreamingWriteFn(write.getBigQueryServices())));
+
+    return WriteResult.in(input.getPipeline());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
index 1d93fa3..83ed3d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java
@@ -18,28 +18,16 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
@@ -52,48 +40,19 @@ import org.apache.beam.sdk.values.KV;
 @VisibleForTesting
 class StreamingWriteFn
     extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
-  /** TableSchema in JSON. Use String to make the class Serializable. */
-  @Nullable
-  private final ValueProvider<String> jsonTableSchema;
-
-  @Nullable private final String tableDescription;
-
   private final BigQueryServices bqServices;
 
   /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
   private transient Map<String, List<TableRow>> tableRows;
 
-  private final Write.CreateDisposition createDisposition;
-
   /** The list of unique ids for each BigQuery table row. */
   private transient Map<String, List<String>> uniqueIdsForTableRows;
 
-  /** The list of tables created so far, so we don't try the creation
-      each time. */
-  private static Set<String> createdTables =
-      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-
   /** Tracks bytes written, exposed as "ByteCount" Counter. */
   private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount");
 
-  /** Constructor. */
-  StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
-                   Write.CreateDisposition createDisposition,
-                   @Nullable String tableDescription, BigQueryServices bqServices) {
-    this.jsonTableSchema = schema == null ? null :
-        NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
-    this.createDisposition = createDisposition;
-    this.bqServices = checkNotNull(bqServices, "bqServices");
-    this.tableDescription = tableDescription;
-  }
-
-  /**
-   * Clear the cached map of created tables. Used for testing.
-   */
-  static void clearCreatedTables() {
-    synchronized (createdTables) {
-      createdTables.clear();
-    }
+  StreamingWriteFn(BigQueryServices bqServices) {
+    this.bqServices = bqServices;
   }
 
   /** Prepares a target BigQuery table. */
@@ -119,9 +78,8 @@ class StreamingWriteFn
   @FinishBundle
   public void finishBundle(Context context) throws Exception {
     BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-
     for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
-      TableReference tableReference = getOrCreateTable(options, entry.getKey());
+      TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
       flushRows(tableReference, entry.getValue(),
           uniqueIdsForTableRows.get(entry.getKey()), options);
     }
@@ -132,39 +90,6 @@ class StreamingWriteFn
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
-
-    builder
-        .addIfNotNull(DisplayData.item("schema", jsonTableSchema)
-          .withLabel("Table Schema"))
-        .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-          .withLabel("Table Description"));
-  }
-
-  public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
-      throws InterruptedException, IOException {
-    TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
-    if (createDisposition != createDisposition.CREATE_NEVER
-        && !createdTables.contains(tableSpec)) {
-      synchronized (createdTables) {
-        // Another thread may have succeeded in creating the table in the meanwhile, so
-        // check again. This check isn't needed for correctness, but we add it to prevent
-        // every thread from attempting a create and overwhelming our BigQuery quota.
-        DatasetService datasetService = bqServices.getDatasetService(options);
-        if (!createdTables.contains(tableSpec)) {
-          if (datasetService.getTable(tableReference) == null) {
-            TableSchema tableSchema = BigQueryIO.JSON_FACTORY.fromString(
-                jsonTableSchema.get(), TableSchema.class);
-            datasetService.createTable(
-                new Table()
-                    .setTableReference(tableReference)
-                    .setSchema(tableSchema)
-                    .setDescription(tableDescription));
-          }
-          createdTables.add(tableSpec);
-        }
-      }
-    }
-    return tableReference;
   }
 
   /**
@@ -173,6 +98,7 @@ class StreamingWriteFn
   private void flushRows(TableReference tableReference,
       List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options)
           throws InterruptedException {
+    System.out.println("FlUSHING ROWS " + tableRows.size());
     if (!tableRows.isEmpty()) {
       try {
         long totalBytes = bqServices.getDatasetService(options).insertAll(

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
index 3cbbf3b..631afeb 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestination.java
@@ -1,7 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.api.services.bigquery.model.TableReference;
+
 /**
- * Created by relax on 3/28/17.
+ * Encapsulates a BigQuery table destination.
  */
 public class TableDestination {
+  private final String tableSpec;
+  private final String tableDescription;
+
+
+  public TableDestination(String tableSpec, String tableDescription) {
+    this.tableSpec = tableSpec;
+    this.tableDescription = tableDescription;
+  }
+
+  public TableDestination(TableReference tableReference, String tableDescription) {
+    this.tableSpec = BigQueryHelpers.toTableSpec(tableReference);
+    this.tableDescription = tableDescription;
+  }
+
+  public String getTableSpec() {
+    return tableSpec;
+  }
+
+
+  public TableReference getTableReference() {
+    return BigQueryHelpers.parseTableSpec(tableSpec);
+  }
+
+  public String getTableDescription() {
+    return tableDescription;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
new file mode 100644
index 0000000..fa24700
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableDestinationCoder.java
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * A coder for {@link TableDestination} objects.
+ */
+public class TableDestinationCoder extends AtomicCoder<TableDestination> {
+  private static final TableDestinationCoder INSTANCE = new TableDestinationCoder();
+
+
+  @JsonCreator
+  public static TableDestinationCoder of() {
+      return INSTANCE;
+    }
+
+  @Override
+    public void encode(TableDestination value, OutputStream outStream, Context context)
+      throws IOException {
+    if (value == null) {
+      throw new CoderException("cannot encode a null value");
+    }
+    stringCoder.encode(value.getTableSpec(), outStream, context.nested());
+    stringCoder.encode(value.getTableDescription(), outStream, context);
+  }
+
+  @Override
+  public TableDestination decode(InputStream inStream, Context context) throws IOException {
+      return new TableDestination(
+          stringCoder.decode(inStream, context.nested()),
+          stringCoder.decode(inStream, context.nested()));
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      return;
+    }
+
+    StringUtf8Coder stringCoder = StringUtf8Coder.of();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
new file mode 100644
index 0000000..6f0186e
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.ValueInSingleWindow;
+
+/**
+ * Fn that tags each table row with a unique id and destination table.
+ * To avoid calling UUID.randomUUID() for each element, which can be costly,
+ * a randomUUID is generated only once per bucket of data. The actual unique
+ * id is created by concatenating this randomUUID with a sequential number.
+ */
+@VisibleForTesting
+class TagWithUniqueIds
+    extends DoFn<KV<ShardedKey<String>, TableRow>, KV<ShardedKey<String>, TableRowInfo>> {
+
+  private transient String randomUUID;
+  private transient long sequenceNo = 0L;
+
+  @StartBundle
+  public void startBundle(Context context) {
+    randomUUID = UUID.randomUUID().toString();
+  }
+
+  /** Tag the input with a unique id. */
+  @ProcessElement
+  public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
+    String uniqueId = randomUUID + sequenceNo++;
+    // We output on keys 0-50 to ensure that there's enough batching for
+    // BigQuery.
+    context.output(KV.of(context.element().getKey(),
+        new TableRowInfo(context.element().getValue(), uniqueId)));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
deleted file mode 100644
index 4e50f7c..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.bigquery;
-
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import java.io.IOException;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.ValueInSingleWindow;
-
-/**
- * Fn that tags each table row with a unique id and destination table.
- * To avoid calling UUID.randomUUID() for each element, which can be costly,
- * a randomUUID is generated only once per bucket of data. The actual unique
- * id is created by concatenating this randomUUID with a sequential number.
- */
-@VisibleForTesting
-class TagWithUniqueIdsAndTable<T>
-    extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
-  /** TableSpec to write to in the case of a single static destination. */
-  private ValueProvider<String> tableSpec = null;
-
-  private final Write<T, ?> write;
-
-  private transient String randomUUID;
-  private transient long sequenceNo = 0L;
-
-  TagWithUniqueIdsAndTable(BigQueryOptions options,
-                           Write<T, ?> write) {
-    ValueProvider<TableReference> table = write.getTableWithDefaultProject(
-        options.as(BigQueryOptions.class));
-    if (table != null) {
-      this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
-    }
-    this.write = write;
-  }
-
-
-  @StartBundle
-  public void startBundle(Context context) {
-    randomUUID = UUID.randomUUID().toString();
-  }
-
-  /** Tag the input with a unique id. */
-  @ProcessElement
-  public void processElement(ProcessContext context, BoundedWindow window) throws IOException {
-    String uniqueId = randomUUID + sequenceNo++;
-    ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
-      String tableSpec = tableSpecFromWindowedValue(
-          context.getPipelineOptions().as(BigQueryOptions.class),
-          ValueInSingleWindow.of(context.element(), context.timestamp(), window, context.pane()));
-    // We output on keys 0-50 to ensure that there's enough batching for
-    // BigQuery.
-    context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
-        new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId)));
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-
-    builder.addIfNotNull(DisplayData.item("table", tableSpec));
-    builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass())
-        .withLabel("Table Reference Function"));
-  }
-
-  @VisibleForTesting
-  ValueProvider<String> getTableSpec() {
-    return tableSpec;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/58ed5c7e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 499aa74..d953edd 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -518,7 +518,6 @@ public class BigQueryIOTest implements Serializable {
 
   /** A fake dataset service that can be serialized, for use in testReadFromTable. */
   private static class FakeDatasetService implements DatasetService, Serializable {
-
     @Override
     public Table getTable(TableReference tableRef)
         throws InterruptedException, IOException {
@@ -630,6 +629,7 @@ public class BigQueryIOTest implements Serializable {
         TableContainer tableContainer = getTableContainer(
             ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
         for (int i = 0; i < rowList.size(); ++i) {
+          System.out.println("adding row " + rowList.get(i));
           tableContainer.addRow(rowList.get(i), insertIdList.get(i));
           dataSize += rowList.get(i).toString().length();
         }
@@ -1121,15 +1121,15 @@ public class BigQueryIOTest implements Serializable {
         }
     );
 
-    SerializableFunction<ValueInSingleWindow<Integer>, String> tableFunction =
-        new SerializableFunction<ValueInSingleWindow<Integer>, String>() {
+    SerializableFunction<ValueInSingleWindow<Integer>, TableDestination> tableFunction =
+        new SerializableFunction<ValueInSingleWindow<Integer>, TableDestination>() {
           @Override
-          public String apply(ValueInSingleWindow<Integer> input) {
+          public TableDestination apply(ValueInSingleWindow<Integer> input) {
             PartitionedGlobalWindow window = (PartitionedGlobalWindow) input.getWindow();
             // Check that we can access the element as well here.
             checkArgument(window.value.equals(Integer.toString(input.getValue() % 5)),
                 "Incorrect element");
-            return "project-id:dataset-id.table-id-" + window.value;
+            return new TableDestination("project-id:dataset-id.table-id-" + window.value, "");
           }
     };
 
@@ -1559,14 +1559,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testStreamingWriteFnCreateNever() throws Exception {
-    StreamingWriteFn fn = new StreamingWriteFn(
-        null, CreateDisposition.CREATE_NEVER, null, new FakeBigQueryServices());
-    assertEquals(BigQueryHelpers.parseTableSpec("dataset.table"),
-        fn.getOrCreateTable(null, "dataset.table"));
-  }
-
-  @Test
   public void testCreateNeverWithStreaming() throws Exception {
     BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     options.setProject("project");