You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/14 21:32:53 UTC

[1/2] beam git commit: Allow users to choose the BigQuery insertion method.

Repository: beam
Updated Branches:
  refs/heads/master 0f8e8dd70 -> f7e8f886c


Allow users to choose the BigQuery insertion method.

If choosing file load jobs on an unbounded PCollection,
a triggering frequency must be specified to control how
often load jobs are generated.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/075d4d45
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/075d4d45
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/075d4d45

Branch: refs/heads/master
Commit: 075d4d45a9cd398f3b4023b6efd495cc58eb9bdd
Parents: 0f8e8dd
Author: Reuven Lax <re...@relax-macbookpro2.roam.corp.google.com>
Authored: Sun Jul 30 11:17:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Aug 14 14:32:10 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 447 +++++++++++++------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 168 ++++++-
 .../sdk/io/gcp/bigquery/ReifyAsIterable.java    |  51 +++
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  15 +-
 .../sdk/io/gcp/bigquery/WritePartition.java     |  13 +-
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   | 111 +++--
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  38 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 269 ++++++-----
 8 files changed, 770 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index e46b1d3..0a1306d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -19,6 +19,7 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
 
 import com.google.api.services.bigquery.model.TableRow;
@@ -26,9 +27,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
@@ -48,9 +50,15 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
@@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +102,12 @@ class BatchLoads<DestinationT>
   // The maximum size of a single file - 4TiB, just under the 5 TiB limit.
   static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40);
 
+  static final int DEFAULT_NUM_FILE_SHARDS = 0;
+
+  // If user triggering is supplied, we will trigger the file write after this many records are
+  // written.
+  static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
+
   // The maximum number of retries to poll the status of a job.
   // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
   static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
@@ -110,6 +125,8 @@ class BatchLoads<DestinationT>
   private final Coder<DestinationT> destinationCoder;
   private int maxNumWritersPerBundle;
   private long maxFileSize;
+  private int numFileShards;
+  private Duration triggeringFrequency;
 
   BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
              boolean singletonTable,
@@ -123,6 +140,8 @@ class BatchLoads<DestinationT>
     this.destinationCoder = destinationCoder;
     this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
     this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
+    this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
+    this.triggeringFrequency = null;
   }
 
   void setTestServices(BigQueryServices bigQueryServices) {
@@ -139,6 +158,14 @@ class BatchLoads<DestinationT>
     this.maxNumWritersPerBundle = maxNumWritersPerBundle;
   }
 
+  public void setTriggeringFrequency(Duration triggeringFrequency) {
+    this.triggeringFrequency = triggeringFrequency;
+  }
+
+  public void setNumFileShards(int numFileShards) {
+    this.numFileShards = numFileShards;
+  }
+
   @VisibleForTesting
   void setMaxFileSize(long maxFileSize) {
     this.maxFileSize = maxFileSize;
@@ -164,171 +191,323 @@ class BatchLoads<DestinationT>
     }
   }
 
-  @Override
-  public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
+  // Expand the pipeline when the user has requested periodically-triggered file writes.
+  private WriteResult expandTriggered(PCollection<KV<DestinationT, TableRow>> input) {
+    checkArgument(numFileShards > 0);
     Pipeline p = input.getPipeline();
+    final PCollectionView<String> jobIdTokenView = createJobIdView(p);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
+    // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
+    // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
+    // is set to a large value, currently we have to buffer all the data unti the trigger fires.
+    // Instead we ensure that the files are written if a threshold number of records are ready.
+    // We use only the user-supplied trigger on the actual BigQuery load. This allows us to
+    // offload the data to the filesystem.
+    PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
+        input.apply(
+            "rewindowIntoGlobal",
+            Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
+                .triggering(
+                    Repeatedly.forever(
+                        AfterFirst.of(
+                            AfterProcessingTime.pastFirstElementInPane()
+                                .plusDelayOf(triggeringFrequency),
+                            AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
+                .discardingFiredPanes());
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
+        writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+
+    // Apply the user's trigger before we start generating BigQuery load jobs.
+    results =
+        results.apply(
+            "applyUserTrigger",
+            Window.<WriteBundlesToFiles.Result<DestinationT>>into(new GlobalWindows())
+                .triggering(
+                    Repeatedly.forever(
+                        AfterProcessingTime.pastFirstElementInPane()
+                            .plusDelayOf(triggeringFrequency)))
+                .discardingFiredPanes());
 
-    // Create a singleton job ID token at execution time. This will be used as the base for all
-    // load jobs issued from this instance of the transform.
-    final PCollection<String> jobIdToken =
-        p.apply("TriggerIdCreation", Create.of("ignored"))
-            .apply(
-                "CreateJobId",
-                MapElements.via(
-                    new SimpleFunction<String, String>() {
-                      @Override
-                      public String apply(String input) {
-                        return BigQueryHelpers.randomUUIDString();
-                      }
-                    }));
-    final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton());
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag");
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag");
 
-    PCollectionView<String> tempFilePrefix = jobIdToken
+    // If we have non-default triggered output, we can't use the side-input technique used in
+    // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
+    // determinism.
+    PCollectionTuple partitions =
+        results
+            .apply(
+                "AttachSingletonKey",
+                WithKeys.<Void, WriteBundlesToFiles.Result<DestinationT>>of((Void) null))
+            .setCoder(
+                KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+            .apply("GroupOntoSingleton", GroupByKey.<Void, Result<DestinationT>>create())
+            .apply("ExtractResultValues", Values.<Iterable<Result<DestinationT>>>create())
             .apply(
-                "GetTempFilePrefix",
+                "WritePartitionTriggered",
                 ParDo.of(
-                    new DoFn<String, String>() {
-                      @ProcessElement
-                      public void getTempFilePrefix(ProcessContext c) {
-                        String tempLocation = resolveTempLocation(
-                            c.getPipelineOptions().getTempLocation(),
-                            "BigQueryWriteTemp", c.element());
-                        LOG.info("Writing BigQuery temporary files to {} before loading them.",
-                            tempLocation);
-                        c.output(tempLocation);
-                      }
-                    }))
-            .apply("TempFilePrefixView", View.<String>asSingleton());
+                        new WritePartition<>(
+                            singletonTable,
+                            dynamicDestinations,
+                            tempFilePrefixView,
+                            multiPartitionsTag,
+                            singlePartitionTag))
+                    .withSideInputs(tempFilePrefixView)
+                    .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+    PCollection<KV<TableDestination, String>> tempTables =
+        writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView);
+    tempTables
+        // Now that the load job has happened, we want the rename to happen immediately.
+        .apply(
+            Window.<KV<TableDestination, String>>into(new GlobalWindows())
+                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
+        .apply(WithKeys.<Void, KV<TableDestination, String>>of((Void) null))
+        .setCoder(
+            KvCoder.of(
+                VoidCoder.of(), KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+        .apply(GroupByKey.<Void, KV<TableDestination, String>>create())
+        .apply(Values.<Iterable<KV<TableDestination, String>>>create())
+        .apply(
+            "WriteRenameTriggered",
+            ParDo.of(
+                    new WriteRename(
+                        bigQueryServices, jobIdTokenView, writeDisposition, createDisposition))
+                .withSideInputs(jobIdTokenView));
+    writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView);
+    return writeResult(p);
+  }
 
+  // Expand the pipeline when the user has not requested periodically-triggered file writes.
+  public WriteResult expandUntriggered(PCollection<KV<DestinationT, TableRow>> input) {
+    Pipeline p = input.getPipeline();
+    final PCollectionView<String> jobIdTokenView = createJobIdView(p);
+    final PCollectionView<String> tempFilePrefixView = createTempFilePrefixView(jobIdTokenView);
     PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
         input.apply(
             "rewindowIntoGlobal",
             Window.<KV<DestinationT, TableRow>>into(new GlobalWindows())
                 .triggering(DefaultTrigger.of())
                 .discardingFiredPanes());
-    PCollectionView<Map<DestinationT, String>> schemasView =
-        inputInGlobalWindow.apply(new CalculateSchemas<>(dynamicDestinations));
+    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
+        (numFileShards == 0)
+            ? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
+            : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
+
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+
+    // This transform will look at the set of files written for each table, and if any table has
+    // too many files or bytes, will partition that table's files into multiple partitions for
+    // loading.
+    PCollectionTuple partitions =
+        results
+            .apply("ReifyResults", new ReifyAsIterable<WriteBundlesToFiles.Result<DestinationT>>())
+            .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+            .apply(
+                "WritePartitionUntriggered",
+                ParDo.of(
+                        new WritePartition<>(
+                            singletonTable,
+                            dynamicDestinations,
+                            tempFilePrefixView,
+                            multiPartitionsTag,
+                            singlePartitionTag))
+                    .withSideInputs(tempFilePrefixView)
+                    .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+    PCollection<KV<TableDestination, String>> tempTables =
+        writeTempTables(partitions.get(multiPartitionsTag), jobIdTokenView);
+
+    tempTables
+        .apply("ReifyRenameInput", new ReifyAsIterable<KV<TableDestination, String>>())
+        .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), StringUtf8Coder.of())))
+        .apply(
+            "WriteRenameUntriggered",
+            ParDo.of(
+                    new WriteRename(
+                        bigQueryServices, jobIdTokenView, writeDisposition, createDisposition))
+                .withSideInputs(jobIdTokenView));
+    writeSinglePartition(partitions.get(singlePartitionTag), jobIdTokenView);
+    return writeResult(p);
+  }
 
+  // Generate the base job id string.
+  private PCollectionView<String> createJobIdView(Pipeline p) {
+    // Create a singleton job ID token at execution time. This will be used as the base for all
+    // load jobs issued from this instance of the transform.
+    return p.apply("JobIdCreationRoot", Create.of((Void) null))
+        .apply(
+            "CreateJobId",
+            MapElements.via(
+                new SimpleFunction<Void, String>() {
+                  @Override
+                  public String apply(Void input) {
+                    return BigQueryHelpers.randomUUIDString();
+                  }
+                }))
+        .apply(View.<String>asSingleton());
+  }
+
+  // Generate the temporary-file prefix.
+  private PCollectionView<String> createTempFilePrefixView(PCollectionView<String> jobIdView) {
+    return ((PCollection<String>) jobIdView.getPCollection())
+        .apply(
+            "GetTempFilePrefix",
+            ParDo.of(
+                new DoFn<String, String>() {
+                  @ProcessElement
+                  public void getTempFilePrefix(ProcessContext c) {
+                    String tempLocation =
+                        resolveTempLocation(
+                            c.getPipelineOptions().getTempLocation(),
+                            "BigQueryWriteTemp",
+                            c.element());
+                    LOG.info(
+                        "Writing BigQuery temporary files to {} before loading them.",
+                        tempLocation);
+                    c.output(tempLocation);
+                  }
+                }))
+        .apply("TempFilePrefixView", View.<String>asSingleton());
+  }
+
+  // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
+  // file byte size, and table destination.
+  PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
+      PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) {
     TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
-        new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles"){};
+        new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
     TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag =
         new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
-    PCollectionTuple writeBundlesTuple = inputInGlobalWindow
-            .apply("WriteBundlesToFiles",
-                ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
-                    maxNumWritersPerBundle, maxFileSize))
-                    .withSideInputs(tempFilePrefix)
-                    .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
+    PCollectionTuple writeBundlesTuple =
+        input.apply(
+            "WriteBundlesToFiles",
+            ParDo.of(
+                    new WriteBundlesToFiles<>(
+                        tempFilePrefix, unwrittedRecordsTag, maxNumWritersPerBundle, maxFileSize))
+                .withSideInputs(tempFilePrefix)
+                .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
     PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
-        writeBundlesTuple.get(writtenFilesTag)
-        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+        writeBundlesTuple
+            .get(writtenFilesTag)
+            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+    PCollection<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecords =
+        writeBundlesTuple
+            .get(unwrittedRecordsTag)
+            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()));
 
     // If the bundles contain too many output tables to be written inline to files (due to memory
     // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
     // Group these records by key, and write the files after grouping. Since the record is grouped
     // by key, we can ensure that only one file is open at a time in each bundle.
     PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped =
-        writeBundlesTuple
-            .get(unwrittedRecordsTag)
-            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()))
-            .apply(GroupByKey.<ShardedKey<DestinationT>, TableRow>create())
-            .apply(
-                ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize))
-                    .withSideInputs(tempFilePrefix))
-            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+        writeShardedRecords(unwrittenRecords, tempFilePrefix);
 
     // PCollection of filename, file byte size, and table destination.
-    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
-        PCollectionList.of(writtenFiles).and(writtenFilesGrouped)
-        .apply(Flatten.<Result<DestinationT>>pCollections());
+    return PCollectionList.of(writtenFiles)
+        .and(writtenFilesGrouped)
+        .apply("FlattenFiles", Flatten.<Result<DestinationT>>pCollections())
+        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+  }
 
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+  // Writes input data to statically-sharded files. Returns a PCollection of filename,
+  // file byte size, and table destination.
+  PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
+      PCollection<KV<DestinationT, TableRow>> input, PCollectionView<String> tempFilePrefix) {
+    checkState(numFileShards > 0);
+    PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords =
+        input
+            .apply(
+                "AddShard",
+                ParDo.of(
+                    new DoFn<KV<DestinationT, TableRow>, KV<ShardedKey<DestinationT>, TableRow>>() {
+                      int shardNumber;
 
-    // Turn the list of files and record counts in a PCollectionView that can be used as a
-    // side input.
-    PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> resultsView =
-        results.apply("ResultsView",
-            View.<WriteBundlesToFiles.Result<DestinationT>>asIterable());
-    // This transform will look at the set of files written for each table, and if any table has
-    // too many files or bytes, will partition that table's files into multiple partitions for
-    // loading.
-    PCollection<Void> singleton = p.apply("singleton",
-        Create.of((Void) null).withCoder(VoidCoder.of()));
-    PCollectionTuple partitions =
-        singleton.apply(
-            "WritePartition",
-            ParDo.of(
-                    new WritePartition<>(
-                        singletonTable,
-                        dynamicDestinations,
-                        tempFilePrefix,
-                        resultsView,
-                        multiPartitionsTag,
-                        singlePartitionTag))
-                .withSideInputs(tempFilePrefix, resultsView)
-                .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
-    List<PCollectionView<?>> writeTablesSideInputs =
-        Lists.newArrayList(jobIdTokenView, schemasView);
-    writeTablesSideInputs.addAll(dynamicDestinations.getSideInputs());
+                      @Setup
+                      public void setup() {
+                        shardNumber = ThreadLocalRandom.current().nextInt(numFileShards);
+                      }
+
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        DestinationT destination = c.element().getKey();
+                        TableRow tableRow = c.element().getValue();
+                        c.output(
+                            KV.of(
+                                ShardedKey.of(destination, ++shardNumber % numFileShards),
+                                tableRow));
+                      }
+                    }))
+            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), TableRowJsonCoder.of()));
+
+    return writeShardedRecords(shardedRecords, tempFilePrefix);
+  }
+
+  private PCollection<Result<DestinationT>> writeShardedRecords(
+      PCollection<KV<ShardedKey<DestinationT>, TableRow>> shardedRecords,
+      PCollectionView<String> tempFilePrefix) {
+    return shardedRecords
+        .apply("GroupByDestination", GroupByKey.<ShardedKey<DestinationT>, TableRow>create())
+        .apply(
+            "WriteGroupedRecords",
+            ParDo.of(new WriteGroupedRecordsToFiles<DestinationT>(tempFilePrefix, maxFileSize))
+                .withSideInputs(tempFilePrefix))
+        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
+  }
+
+  // Take in a list of files and write them to temporary tables.
+  private PCollection<KV<TableDestination, String>> writeTempTables(
+      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+      PCollectionView<String> jobIdTokenView) {
+    List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView);
+    sideInputs.addAll(dynamicDestinations.getSideInputs());
 
     Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
-          KvCoder.of(
-              ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
-              ListCoder.of(StringUtf8Coder.of()));
+        KvCoder.of(
+            ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
+            ListCoder.of(StringUtf8Coder.of()));
 
     // If WriteBundlesToFiles produced more than MAX_NUM_FILES files or MAX_SIZE_BYTES bytes, then
     // the import needs to be split into multiple partitions, and those partitions will be
     // specified in multiPartitionsTag.
-    PCollection<KV<TableDestination, String>> tempTables =
-        partitions
-            .get(multiPartitionsTag)
-            .setCoder(partitionsCoder)
-            // Reshuffle will distribute this among multiple workers, and also guard against
-            // reexecution of the WritePartitions step once WriteTables has begun.
-            .apply(
-                "MultiPartitionsReshuffle",
-                Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
-            .apply(
-                "MultiPartitionsWriteTables",
-                ParDo.of(
-                        new WriteTables<>(
-                            false,
-                            bigQueryServices,
-                            jobIdTokenView,
-                            schemasView,
-                            WriteDisposition.WRITE_EMPTY,
-                            CreateDisposition.CREATE_IF_NEEDED,
-                            dynamicDestinations))
-                    .withSideInputs(writeTablesSideInputs));
-
-    // This view maps each final table destination to the set of temporary partitioned tables
-    // the PCollection was loaded into.
-    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
-        tempTables.apply("TempTablesView", View.<TableDestination, String>asMultimap());
-
-    singleton.apply(
-        "WriteRename",
-        ParDo.of(
-                new WriteRename(
-                    bigQueryServices,
-                    jobIdTokenView,
-                    writeDisposition,
-                    createDisposition,
-                    tempTablesView))
-            .withSideInputs(tempTablesView, jobIdTokenView));
+    return input
+        .setCoder(partitionsCoder)
+        // Reshuffle will distribute this among multiple workers, and also guard against
+        // reexecution of the WritePartitions step once WriteTables has begun.
+        .apply("MultiPartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
+        .apply(
+            "MultiPartitionsWriteTables",
+            ParDo.of(
+                    new WriteTables<>(
+                        false,
+                        bigQueryServices,
+                        jobIdTokenView,
+                        WriteDisposition.WRITE_EMPTY,
+                        CreateDisposition.CREATE_IF_NEEDED,
+                        dynamicDestinations))
+                .withSideInputs(sideInputs));
+  }
 
+  // In the case where the files fit into a single load job, there's no need to write temporary
+  // tables and rename. We can load these files directly into the target BigQuery table.
+  void writeSinglePartition(
+      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+      PCollectionView<String> jobIdTokenView) {
+    List<PCollectionView<?>> sideInputs = Lists.<PCollectionView<?>>newArrayList(jobIdTokenView);
+    sideInputs.addAll(dynamicDestinations.getSideInputs());
+    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+        KvCoder.of(
+            ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
+            ListCoder.of(StringUtf8Coder.of()));
     // Write single partition to final table
-    partitions
-        .get(singlePartitionTag)
+    input
         .setCoder(partitionsCoder)
         // Reshuffle will distribute this among multiple workers, and also guard against
         // reexecution of the WritePartitions step once WriteTables has begun.
-        .apply(
-            "SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
+        .apply("SinglePartitionsReshuffle", Reshuffle.<ShardedKey<DestinationT>, List<String>>of())
         .apply(
             "SinglePartitionWriteTables",
             ParDo.of(
@@ -336,14 +515,20 @@ class BatchLoads<DestinationT>
                         true,
                         bigQueryServices,
                         jobIdTokenView,
-                        schemasView,
                         writeDisposition,
                         createDisposition,
                         dynamicDestinations))
-                .withSideInputs(writeTablesSideInputs));
+                .withSideInputs(sideInputs));
+  }
 
+  private WriteResult writeResult(Pipeline p) {
     PCollection<TableRow> empty =
         p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
-    return WriteResult.in(input.getPipeline(), new TupleTag<TableRow>("failedInserts"), empty);
+    return WriteResult.in(p, new TupleTag<TableRow>("failedInserts"), empty);
+  }
+
+  @Override
+  public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
+    return (triggeringFrequency != null) ? expandTriggered(input) : expandUntriggered(input);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 6edbd06..feb085d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -92,19 +92,20 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
+import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link PTransform}s for reading and writing
- * <a href="https://developers.google.com/bigquery/">BigQuery</a> tables.
+ * {@link PTransform}s for reading and writing <a
+ * href="https://developers.google.com/bigquery/">BigQuery</a> tables.
  *
  * <h3>Table References</h3>
  *
  * <p>A fully-qualified BigQuery table name consists of three components:
+ *
  * <ul>
- *   <li>{@code projectId}: the Cloud project id (defaults to
- *       {@link GcpOptions#getProject()}).
+ *   <li>{@code projectId}: the Cloud project id (defaults to {@link GcpOptions#getProject()}).
  *   <li>{@code datasetId}: the BigQuery dataset id, unique within a project.
  *   <li>{@code tableId}: a table id, unique within a dataset.
  * </ul>
@@ -122,8 +123,8 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Reading</h3>
  *
- * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation.
- * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
+ * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation. This produces a
+ * {@link PCollection} of {@link TableRow TableRows} as output:
  *
  * <pre>{@code
  * PCollection<TableRow> weatherData = pipeline.apply(
@@ -146,12 +147,11 @@ import org.slf4j.LoggerFactory;
  *
  * <h3>Writing</h3>
  *
- * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation.
- * This consumes either a {@link PCollection} of {@link TableRow TableRows} as input when using
- * {@link BigQueryIO#writeTableRows()} or of a user-defined type when using
- * {@link BigQueryIO#write()}. When using a user-defined type, a function must be provided to
- * turn this type into a {@link TableRow} using
- * {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
+ * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes
+ * either a {@link PCollection} of {@link TableRow TableRows} as input when using {@link
+ * BigQueryIO#writeTableRows()} or of a user-defined type when using {@link BigQueryIO#write()}.
+ * When using a user-defined type, a function must be provided to turn this type into a {@link
+ * TableRow} using {@link BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
  *
  * <pre>{@code
  * PCollection<TableRow> quotes = ...
@@ -216,8 +216,8 @@ import org.slf4j.LoggerFactory;
  * can also be useful when writing to a single table, as it allows a previous stage to calculate the
  * schema (possibly based on the full collection of records being written to BigQuery).
  *
- * <p>For the most general form of dynamic table destinations and schemas, look at
- * {@link BigQueryIO.Write#to(DynamicDestinations)}.
+ * <p>For the most general form of dynamic table destinations and schemas, look at {@link
+ * BigQueryIO.Write#to(DynamicDestinations)}.
  *
  * <h3>Permissions</h3>
  *
@@ -227,6 +227,15 @@ import org.slf4j.LoggerFactory;
  *
  * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
  * </a> for security and permission related information specific to BigQuery.
+ *
+ * <h3>Insertion Method</h3>
+ *
+ * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using
+ * {@link BigQueryIO.Write#withMethod}. If no method is supplied, then a default method will be
+ * chosen based on the input PCollection. See {@link BigQueryIO.Write.Method} for more information
+ * about the methods. The different insertion methods provide different tradeoffs of cost, quota,
+ * and data consistency; please see BigQuery documentation for more information about these
+ * tradeoffs.
  */
 public class BigQueryIO {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
@@ -757,6 +766,8 @@ public class BigQueryIO {
         .setBigQueryServices(new BigQueryServicesImpl())
         .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
         .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
+        .setNumFileShards(0)
+        .setMethod(Write.Method.DEFAULT)
         .build();
   }
 
@@ -771,6 +782,41 @@ public class BigQueryIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
+    /** Determines the method used to insert data in BigQuery. */
+    public enum Method {
+      /**
+       * The default behavior if no method is explicitly set. If the input is bounded, then file
+       * loads will be used. If the input is unbounded, then streaming inserts will be used.
+       */
+      DEFAULT,
+
+      /**
+       * Use BigQuery load jobs to insert data. Records will first be written to files, and these
+       * files will be loaded into BigQuery. This is the default method when the input is bounded.
+       * This method can be chosen for unbounded inputs as well, as long as a triggering frequency
+       * is also set using {@link #withTriggeringFrequency}. BigQuery has daily quotas on the number
+       * of load jobs allowed per day, so be careful not to set the triggering frequency too
+       * frequent. For more information, see <a
+       * href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from
+       * Cloud Storage</a>.
+       */
+      FILE_LOADS,
+
+      /**
+       * Use the BigQuery streaming insert API to insert data. This provides the lowest-latency
+       * insert path into BigQuery, and therefore is the default method when the input is unbounded.
+       * BigQuery will make a strong effort to ensure no duplicates when using this path, however
+       * there are some scenarios in which BigQuery is unable to make this guarantee (see
+       * https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over
+       * the output table to periodically clean these rare duplicates. Alternatively, using the
+       * {@link #FILE_LOADS} insert method does guarantee no duplicates, though the latency for the
+       * insert into BigQuery will be much higher. For more information, see <a
+       * href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into
+       * BigQuery</a>.
+       */
+      STREAMING_INSERTS
+    }
+
     @Nullable abstract ValueProvider<String> getJsonTableRef();
     @Nullable abstract SerializableFunction<ValueInSingleWindow<T>, TableDestination>
       getTableFunction();
@@ -787,6 +833,14 @@ public class BigQueryIO {
     abstract BigQueryServices getBigQueryServices();
     @Nullable abstract Integer getMaxFilesPerBundle();
     @Nullable abstract Long getMaxFileSize();
+
+    abstract int getNumFileShards();
+
+    @Nullable
+    abstract Duration getTriggeringFrequency();
+
+    abstract Method getMethod();
+
     @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
 
     abstract Builder<T> toBuilder();
@@ -807,6 +861,13 @@ public class BigQueryIO {
       abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
       abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);
       abstract Builder<T> setMaxFileSize(Long maxFileSize);
+
+      abstract Builder<T> setNumFileShards(int numFileShards);
+
+      abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency);
+
+      abstract Builder<T> setMethod(Method method);
+
       abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);
 
       abstract Write<T> build();
@@ -992,6 +1053,40 @@ public class BigQueryIO {
       return toBuilder().setValidate(false).build();
     }
 
+    /**
+     * Choose the method used to write data to BigQuery. See the Javadoc on {@link Method} for
+     * information and restrictions of the different methods.
+     */
+    public Write<T> withMethod(Method method) {
+      return toBuilder().setMethod(method).build();
+    }
+
+    /**
+     * Choose the frequency at which file writes are triggered.
+     *
+     * <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, and
+     * only when writing a bounded {@link PCollection}.
+     *
+     * <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data
+     * written since the last load job. BigQuery has limits on how many load jobs can be triggered
+     * per day, so be careful not to set this duration too low, or you may exceed daily quota. Often
+     * this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery
+     * quota. See <a href="https://cloud.google.com/bigquery/quota-policy">Quota Policy</a> for more
+     * information about BigQuery quotas.
+     */
+    public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {
+      return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
+    }
+
+    /**
+     * Control how many file shards are written when using BigQuery load jobs. Applicable only when
+     * also setting {@link #withTriggeringFrequency}. The default value is 1000.
+     */
+    @Experimental
+    public Write<T> withNumFileShards(int numFileShards) {
+      return toBuilder().setNumFileShards(numFileShards).build();
+    }
+
     @VisibleForTesting
     Write<T> withTestServices(BigQueryServices testServices) {
       return toBuilder().setBigQueryServices(testServices).build();
@@ -1029,6 +1124,17 @@ public class BigQueryIO {
       }
     }
 
+    private Method resolveMethod(PCollection<T> input) {
+      if (getMethod() != Method.DEFAULT) {
+        return getMethod();
+      }
+      // By default, when writing an Unbounded PCollection, we use StreamingInserts and
+      // BigQuery's streaming import API.
+      return (input.isBounded() == IsBounded.UNBOUNDED)
+          ? Method.STREAMING_INSERTS
+          : Method.FILE_LOADS;
+    }
+
     @Override
     public WriteResult expand(PCollection<T> input) {
       // We must have a destination to write to!
@@ -1048,6 +1154,7 @@ public class BigQueryIO {
               || getSchemaFromView() != null,
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
+
       List<?> allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(),
           getDynamicDestinations());
       checkArgument(1
@@ -1061,7 +1168,21 @@ public class BigQueryIO {
           "No more than one of jsonSchema, schemaFromView, or dynamicDestinations may "
               + "be set");
 
-
+      Method method = resolveMethod(input);
+      if (input.isBounded() == IsBounded.UNBOUNDED && method == Method.FILE_LOADS) {
+        checkArgument(
+            getTriggeringFrequency() != null,
+            "When writing an unbounded PCollection via FILE_LOADS, "
+                + "triggering frequency must be specified");
+      } else {
+        checkArgument(
+            getTriggeringFrequency() == null && getNumFileShards() == 0,
+            "Triggering frequency or number of file shards can be specified only when writing "
+                + "an unbounded PCollection via FILE_LOADS, but: the collection was %s "
+                + "and the method was %s",
+            input.isBounded(),
+            method);
+      }
       DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
       if (dynamicDestinations == null) {
         if (getJsonTableRef() != null) {
@@ -1069,17 +1190,20 @@ public class BigQueryIO {
               DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
                   getJsonTableRef(), getTableDescription());
         } else if (getTableFunction() != null) {
-          dynamicDestinations = new TableFunctionDestinations(getTableFunction());
+          dynamicDestinations = new TableFunctionDestinations<>(getTableFunction());
         }
 
         // Wrap with a DynamicDestinations class that will provide a schema. There might be no
         // schema provided if the create disposition is CREATE_NEVER.
         if (getJsonSchema() != null) {
           dynamicDestinations =
-              new ConstantSchemaDestinations(dynamicDestinations, getJsonSchema());
+              new ConstantSchemaDestinations<>(
+                  (DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());
         } else if (getSchemaFromView() != null) {
           dynamicDestinations =
-              new SchemaFromViewDestinations(dynamicDestinations, getSchemaFromView());
+              new SchemaFromViewDestinations<>(
+                  (DynamicDestinations<T, TableDestination>) dynamicDestinations,
+                  getSchemaFromView());
         }
       }
       return expandTyped(input, dynamicDestinations);
@@ -1100,9 +1224,9 @@ public class BigQueryIO {
               .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
               .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
 
-      // When writing an Unbounded PCollection, we use StreamingInserts and BigQuery's streaming
-      // import API.
-      if (input.isBounded() == IsBounded.UNBOUNDED) {
+      Method method = resolveMethod(input);
+
+      if (method == Method.STREAMING_INSERTS) {
         checkArgument(
             getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
@@ -1129,6 +1253,8 @@ public class BigQueryIO {
         if (getMaxFileSize() != null) {
           batchLoads.setMaxFileSize(getMaxFileSize());
         }
+        batchLoads.setTriggeringFrequency(getTriggeringFrequency());
+        batchLoads.setNumFileShards(getNumFileShards());
         return rowsWithDestination.apply(batchLoads);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
new file mode 100644
index 0000000..18a359c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/ReifyAsIterable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * This transforms turns a side input into a singleton PCollection that can be used as the main
+ * input for another transform.
+ */
+public class ReifyAsIterable<T> extends PTransform<PCollection<T>, PCollection<Iterable<T>>> {
+  @Override
+  public PCollection<Iterable<T>> expand(PCollection<T> input) {
+    final PCollectionView<Iterable<T>> view = input.apply(View.<T>asIterable());
+    return input
+        .getPipeline()
+        .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+        .apply(
+            ParDo.of(
+                    new DoFn<Void, Iterable<T>>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        c.output(c.sideInput(view));
+                      }
+                    })
+                .withSideInputs(view));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index e1ed746..e337f94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -66,9 +66,10 @@ class WriteBundlesToFiles<DestinationT>
   private transient Map<DestinationT, TableRowWriter> writers;
   private transient Map<DestinationT, BoundedWindow> writerWindows;
   private final PCollectionView<String> tempFilePrefixView;
-  private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
+  private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag;
   private int maxNumWritersPerBundle;
   private long maxFileSize;
+  private int spilledShardNumber;
 
   /**
    * The result of the {@link WriteBundlesToFiles} transform. Corresponds to a single output file,
@@ -133,11 +134,11 @@ class WriteBundlesToFiles<DestinationT>
 
   WriteBundlesToFiles(
       PCollectionView<String> tempFilePrefixView,
-      TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
+      TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittenRecordsTag,
       int maxNumWritersPerBundle,
       long maxFileSize) {
     this.tempFilePrefixView = tempFilePrefixView;
-    this.unwrittedRecordsTag = unwrittedRecordsTag;
+    this.unwrittenRecordsTag = unwrittenRecordsTag;
     this.maxNumWritersPerBundle = maxNumWritersPerBundle;
     this.maxFileSize = maxFileSize;
   }
@@ -148,6 +149,7 @@ class WriteBundlesToFiles<DestinationT>
     // bundles.
     this.writers = Maps.newHashMap();
     this.writerWindows = Maps.newHashMap();
+    this.spilledShardNumber = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
   }
 
   TableRowWriter createAndInsertWriter(DestinationT destination, String tempFilePrefix,
@@ -174,9 +176,10 @@ class WriteBundlesToFiles<DestinationT>
       } else {
         // This means that we already had too many writers open in this bundle. "spill" this record
         // into the output. It will be grouped and written to a file in a subsequent stage.
-        c.output(unwrittedRecordsTag,
-            KV.of(ShardedKey.of(destination,
-                ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR)),
+        c.output(
+            unwrittenRecordsTag,
+            KV.of(
+                ShardedKey.of(destination, (++spilledShardNumber) % SPILLED_RECORD_SHARDING_FACTOR),
                 c.element().getValue()));
         return;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index 451d1bd..934f1ae 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
@@ -34,12 +35,13 @@ import org.apache.beam.sdk.values.TupleTag;
  * tablespec and the list of files corresponding to each partition of that table.
  */
 class WritePartition<DestinationT>
-    extends DoFn<Void, KV<ShardedKey<DestinationT>, List<String>>> {
+    extends DoFn<
+        Iterable<WriteBundlesToFiles.Result<DestinationT>>,
+        KV<ShardedKey<DestinationT>, List<String>>> {
   private final boolean singletonTable;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final PCollectionView<String> tempFilePrefix;
-  private final PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results;
-  private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
+  @Nullable private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
   private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
 
   private static class PartitionData {
@@ -104,12 +106,10 @@ class WritePartition<DestinationT>
       boolean singletonTable,
       DynamicDestinations<?, DestinationT> dynamicDestinations,
       PCollectionView<String> tempFilePrefix,
-      PCollectionView<Iterable<WriteBundlesToFiles.Result<DestinationT>>> results,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
       TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag) {
     this.singletonTable = singletonTable;
     this.dynamicDestinations = dynamicDestinations;
-    this.results = results;
     this.tempFilePrefix = tempFilePrefix;
     this.multiPartitionsTag = multiPartitionsTag;
     this.singlePartitionTag = singlePartitionTag;
@@ -117,8 +117,7 @@ class WritePartition<DestinationT>
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    List<WriteBundlesToFiles.Result<DestinationT>> results =
-        Lists.newArrayList(c.sideInput(this.results));
+    List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element());
 
     // If there are no elements to write _and_ the user specified a constant output table, then
     // generate an empty table of that name.

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index f641b32..eb1da5f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -22,9 +22,11 @@ import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.TableReference;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -35,74 +37,85 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Copies temporary tables to destination table.
+ * Copies temporary tables to destination table. The input element is an {@link Iterable} that
+ * provides the list of all temporary tables created for a given {@link TableDestination}.
  */
-class WriteRename extends DoFn<Void, Void> {
+class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
 
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
-  private final WriteDisposition writeDisposition;
-  private final CreateDisposition createDisposition;
-  // Map from final destination to a list of temporary tables that need to be copied into it.
-  private final PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView;
 
+  // In the triggered scenario, the user-supplied create and write dispositions only apply to the
+  // first trigger pane, as that's when when the table is created. Subsequent loads should always
+  // append to the table, and so use CREATE_NEVER and WRITE_APPEND dispositions respectively.
+  private final WriteDisposition firstPaneWriteDisposition;
+  private final CreateDisposition firstPaneCreateDisposition;
 
   public WriteRename(
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
       WriteDisposition writeDisposition,
-      CreateDisposition createDisposition,
-      PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView) {
+      CreateDisposition createDisposition) {
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
-    this.writeDisposition = writeDisposition;
-    this.createDisposition = createDisposition;
-    this.tempTablesView = tempTablesView;
+    this.firstPaneWriteDisposition = writeDisposition;
+    this.firstPaneCreateDisposition = createDisposition;
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
-    Map<TableDestination, Iterable<String>> tempTablesMap =
-        Maps.newHashMap(c.sideInput(tempTablesView));
-
-    // Process each destination table.
-    for (Map.Entry<TableDestination, Iterable<String>> entry : tempTablesMap.entrySet()) {
-      TableDestination finalTableDestination = entry.getKey();
-      List<String> tempTablesJson = Lists.newArrayList(entry.getValue());
-      // Do not copy if no temp tables are provided
-      if (tempTablesJson.size() == 0) {
-        return;
-      }
+    Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
+    for (KV<TableDestination, String> entry : c.element()) {
+      tempTables.put(entry.getKey(), entry.getValue());
+    }
+    for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
+      // Process each destination table.
+      writeRename(entry.getKey(), entry.getValue(), c);
+    }
+  }
 
-      List<TableReference> tempTables = Lists.newArrayList();
-      for (String table : tempTablesJson) {
-        tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
-      }
+  private void writeRename(
+      TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
+      throws Exception {
+    WriteDisposition writeDisposition =
+        (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+    CreateDisposition createDisposition =
+        (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
+    List<String> tempTablesJson = Lists.newArrayList(tempTableNames);
+    // Do not copy if no temp tables are provided
+    if (tempTablesJson.size() == 0) {
+      return;
+    }
 
-      // Make sure each destination table gets a unique job id.
-      String jobIdPrefix = BigQueryHelpers.createJobId(
-          c.sideInput(jobIdToken), finalTableDestination, -1);
-
-      copy(
-          bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
-          bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
-          jobIdPrefix,
-          finalTableDestination.getTableReference(),
-          tempTables,
-          writeDisposition,
-          createDisposition,
-          finalTableDestination.getTableDescription());
-
-      DatasetService tableService =
-          bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
-      removeTemporaryTables(tableService, tempTables);
+    List<TableReference> tempTables = Lists.newArrayList();
+    for (String table : tempTablesJson) {
+      tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
     }
+
+    // Make sure each destination table gets a unique job id.
+    String jobIdPrefix =
+        BigQueryHelpers.createJobId(c.sideInput(jobIdToken), finalTableDestination, -1);
+
+    copy(
+        bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
+        jobIdPrefix,
+        finalTableDestination.getTableReference(),
+        tempTables,
+        writeDisposition,
+        createDisposition,
+        finalTableDestination.getTableDescription());
+
+    DatasetService tableService =
+        bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
+    removeTemporaryTables(tableService, tempTables);
   }
 
   private void copy(
@@ -174,9 +187,11 @@ class WriteRename extends DoFn<Void, Void> {
     super.populateDisplayData(builder);
 
     builder
-        .add(DisplayData.item("writeDisposition", writeDisposition.toString())
-            .withLabel("Write Disposition"))
-        .add(DisplayData.item("createDisposition", createDisposition.toString())
-            .withLabel("Create Disposition"));
+        .add(
+            DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString())
+                .withLabel("Write Disposition"))
+        .add(
+            DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString())
+                .withLabel("Create Disposition"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ed2916..24911a7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -26,6 +26,7 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -65,35 +66,48 @@ class WriteTables<DestinationT>
   private final boolean singlePartition;
   private final BigQueryServices bqServices;
   private final PCollectionView<String> jobIdToken;
-  private final PCollectionView<Map<DestinationT, String>> schemasView;
-  private final WriteDisposition writeDisposition;
-  private final CreateDisposition createDisposition;
+  private final WriteDisposition firstPaneWriteDisposition;
+  private final CreateDisposition firstPaneCreateDisposition;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
 
   public WriteTables(
       boolean singlePartition,
       BigQueryServices bqServices,
       PCollectionView<String> jobIdToken,
-      PCollectionView<Map<DestinationT, String>> schemasView,
       WriteDisposition writeDisposition,
       CreateDisposition createDisposition,
       DynamicDestinations<?, DestinationT> dynamicDestinations) {
     this.singlePartition = singlePartition;
     this.bqServices = bqServices;
     this.jobIdToken = jobIdToken;
-    this.schemasView = schemasView;
-    this.writeDisposition = writeDisposition;
-    this.createDisposition = createDisposition;
+    this.firstPaneWriteDisposition = writeDisposition;
+    this.firstPaneCreateDisposition = createDisposition;
     this.dynamicDestinations = dynamicDestinations;
   }
 
+  @StartBundle
+  public void startBundle(StartBundleContext c) {
+    // Clear the map on each bundle so we can notice side-input updates.
+    // (alternative is to use a cache with a TTL).
+    jsonSchemas.clear();
+  }
+
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     dynamicDestinations.setSideInputAccessorFromProcessContext(c);
     DestinationT destination = c.element().getKey().getKey();
-    TableSchema tableSchema =
-        BigQueryHelpers.fromJsonString(
-            c.sideInput(schemasView).get(destination), TableSchema.class);
+    TableSchema tableSchema;
+    String jsonSchema = jsonSchemas.get(destination);
+    if (jsonSchema != null) {
+      tableSchema = BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
+    } else {
+      tableSchema = dynamicDestinations.getSchema(destination);
+      if (tableSchema != null) {
+        jsonSchemas.put(destination, BigQueryHelpers.toJsonString(tableSchema));
+      }
+    }
+
     TableDestination tableDestination = dynamicDestinations.getTable(destination);
     TableReference tableReference = tableDestination.getTableReference();
     if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
@@ -112,6 +126,10 @@ class WriteTables<DestinationT>
       tableReference.setTableId(jobIdPrefix);
     }
 
+    WriteDisposition writeDisposition =
+        (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+    CreateDisposition createDisposition =
+        (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
     load(
         bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
         bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),

http://git-wip-us.apache.org/repos/asf/beam/blob/075d4d45/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 8db4e94..3d53b7e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -47,12 +47,14 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
@@ -80,7 +82,6 @@ import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -91,6 +92,7 @@ import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
@@ -105,6 +107,9 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
@@ -130,20 +135,19 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
-import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.Matchers;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
@@ -540,65 +544,73 @@ public class BigQueryIOTest implements Serializable {
     if (streaming) {
       users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
     }
-    users.apply("WriteBigQuery", BigQueryIO.<String>write()
+    users.apply(
+        "WriteBigQuery",
+        BigQueryIO.<String>write()
             .withTestServices(fakeBqServices)
             .withMaxFilesPerBundle(5)
             .withMaxFileSize(10)
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-            .withFormatFunction(new SerializableFunction<String, TableRow>() {
-              @Override
-              public TableRow apply(String user) {
-                Matcher matcher = userPattern.matcher(user);
-                if (matcher.matches()) {
-                  return new TableRow().set("name", matcher.group(1))
-                      .set("id", Integer.valueOf(matcher.group(2)));
-                }
-                throw new RuntimeException("Unmatching element " + user);
-              }
-            })
-            .to(new StringIntegerDestinations() {
-              @Override
-              public Integer getDestination(ValueInSingleWindow<String> element) {
-                assertThat(element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
-                Matcher matcher = userPattern.matcher(element.getValue());
-                if (matcher.matches()) {
-                  // Since we name tables by userid, we can simply store an Integer to represent
-                  // a table.
-                  return Integer.valueOf(matcher.group(2));
-                }
-                throw new RuntimeException("Unmatching destination " + element.getValue());
-              }
+            .withFormatFunction(
+                new SerializableFunction<String, TableRow>() {
+                  @Override
+                  public TableRow apply(String user) {
+                    Matcher matcher = userPattern.matcher(user);
+                    if (matcher.matches()) {
+                      return new TableRow()
+                          .set("name", matcher.group(1))
+                          .set("id", Integer.valueOf(matcher.group(2)));
+                    }
+                    throw new RuntimeException("Unmatching element " + user);
+                  }
+                })
+            .to(
+                new StringIntegerDestinations() {
+                  @Override
+                  public Integer getDestination(ValueInSingleWindow<String> element) {
+                    assertThat(
+                        element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class));
+                    Matcher matcher = userPattern.matcher(element.getValue());
+                    if (matcher.matches()) {
+                      // Since we name tables by userid, we can simply store an Integer to represent
+                      // a table.
+                      return Integer.valueOf(matcher.group(2));
+                    }
+                    throw new RuntimeException("Unmatching destination " + element.getValue());
+                  }
 
-              @Override
-              public TableDestination getTable(Integer userId) {
-                verifySideInputs();
-                // Each user in it's own table.
-                return new TableDestination("dataset-id.userid-" + userId,
-                    "table for userid " + userId);
-              }
+                  @Override
+                  public TableDestination getTable(Integer userId) {
+                    verifySideInputs();
+                    // Each user in it's own table.
+                    return new TableDestination(
+                        "dataset-id.userid-" + userId, "table for userid " + userId);
+                  }
 
-              @Override
-              public TableSchema getSchema(Integer userId) {
-                verifySideInputs();
-                return new TableSchema().setFields(
-                    ImmutableList.of(
-                        new TableFieldSchema().setName("name").setType("STRING"),
-                        new TableFieldSchema().setName("id").setType("INTEGER")));
-              }
+                  @Override
+                  public TableSchema getSchema(Integer userId) {
+                    verifySideInputs();
+                    return new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("name").setType("STRING"),
+                                new TableFieldSchema().setName("id").setType("INTEGER")));
+                  }
 
-              @Override
-              public List<PCollectionView<?>> getSideInputs() {
-                return ImmutableList.of(sideInput1, sideInput2);
-              }
+                  @Override
+                  public List<PCollectionView<?>> getSideInputs() {
+                    return ImmutableList.of(sideInput1, sideInput2);
+                  }
 
-              private void verifySideInputs() {
-                assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c"));
-                Map<String, String> mapSideInput = sideInput(sideInput2);
-                assertEquals(3, mapSideInput.size());
-                assertThat(mapSideInput,
-                    allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
-              }
-            })
+                  private void verifySideInputs() {
+                    assertThat(sideInput(sideInput1), containsInAnyOrder("a", "b", "c"));
+                    Map<String, String> mapSideInput = sideInput(sideInput2);
+                    assertEquals(3, mapSideInput.size());
+                    assertThat(
+                        mapSideInput,
+                        allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c")));
+                  }
+                })
             .withoutValidation());
     p.run();
 
@@ -626,6 +638,59 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testTriggeredFileLoads() throws Exception {
+    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("project-id");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    FakeDatasetService datasetService = new FakeDatasetService();
+    FakeBigQueryServices fakeBqServices =
+        new FakeBigQueryServices()
+            .withJobService(new FakeJobService())
+            .withDatasetService(datasetService);
+
+    List<TableRow> elements = Lists.newArrayList();
+    for (int i = 0; i < 30; ++i) {
+      elements.add(new TableRow().set("number", i));
+    }
+
+    datasetService.createDataset("project-id", "dataset-id", "", "");
+    TestStream<TableRow> testStream =
+        TestStream.create(TableRowJsonCoder.of())
+            .addElements(
+                elements.get(0), Iterables.toArray(elements.subList(1, 10), TableRow.class))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            .addElements(
+                elements.get(10), Iterables.toArray(elements.subList(11, 20), TableRow.class))
+            .advanceProcessingTime(Duration.standardMinutes(1))
+            .addElements(
+                elements.get(20), Iterables.toArray(elements.subList(21, 30), TableRow.class))
+            .advanceWatermarkToInfinity();
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(testStream)
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to("project-id:dataset-id.table-id")
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("number").setType("INTEGER"))))
+                .withTestServices(fakeBqServices)
+                .withTriggeringFrequency(Duration.standardSeconds(30))
+                .withNumFileShards(2)
+                .withMethod(Method.FILE_LOADS)
+                .withoutValidation());
+    p.run();
+
+    assertThat(
+        datasetService.getAllRows("project-id", "dataset-id", "table-id"),
+        containsInAnyOrder(Iterables.toArray(elements, TableRow.class)));
+  }
+
+  @Test
   public void testRetryPolicy() throws Exception {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("project-id");
@@ -1796,25 +1861,24 @@ public class BigQueryIOTest implements Serializable {
     TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
         new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
 
-    PCollectionView<Iterable<WriteBundlesToFiles.Result<TableDestination>>> resultsView =
-        p.apply(
-                Create.of(files)
-                    .withCoder(WriteBundlesToFiles.ResultCoder.of(TableDestinationCoder.of())))
-            .apply(View.<WriteBundlesToFiles.Result<TableDestination>>asIterable());
-
     String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
     PCollectionView<String> tempFilePrefixView =
         p.apply(Create.of(tempFilePrefix)).apply(View.<String>asSingleton());
 
     WritePartition<TableDestination> writePartition =
-        new WritePartition<>(isSingleton, dynamicDestinations, tempFilePrefixView,
-            resultsView, multiPartitionsTag, singlePartitionTag);
-
-    DoFnTester<Void, KV<ShardedKey<TableDestination>, List<String>>> tester =
-        DoFnTester.of(writePartition);
-    tester.setSideInput(resultsView, GlobalWindow.INSTANCE, files);
+        new WritePartition<>(
+            isSingleton,
+            dynamicDestinations,
+            tempFilePrefixView,
+            multiPartitionsTag,
+            singlePartitionTag);
+
+    DoFnTester<
+            Iterable<WriteBundlesToFiles.Result<TableDestination>>,
+            KV<ShardedKey<TableDestination>, List<String>>>
+        tester = DoFnTester.of(writePartition);
     tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix);
-    tester.processElement(null);
+    tester.processElement(files);
 
     List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
     if (expectedNumPartitionsPerTable > 1) {
@@ -1864,7 +1928,7 @@ public class BigQueryIOTest implements Serializable {
 
     @Override
     public TableSchema getSchema(String destination) {
-      throw new UnsupportedOperationException("getSchema not expected in this test.");
+      return null;
     }
   }
 
@@ -1926,16 +1990,11 @@ public class BigQueryIOTest implements Serializable {
         .apply("CreateJobId", Create.of("jobId"))
         .apply(View.<String>asSingleton());
 
-    PCollectionView<Map<String, String>> schemaMapView =
-        p.apply("CreateEmptySchema",
-            Create.empty(new TypeDescriptor<KV<String, String>>() {}))
-            .apply(View.<String, String>asMap());
     WriteTables<String> writeTables =
         new WriteTables<>(
             false,
             fakeBqServices,
             jobIdTokenView,
-            schemaMapView,
             WriteDisposition.WRITE_EMPTY,
             CreateDisposition.CREATE_IF_NEEDED,
             new IdentityDynamicTables());
@@ -1943,7 +2002,6 @@ public class BigQueryIOTest implements Serializable {
     DoFnTester<KV<ShardedKey<String>, List<String>>,
         KV<TableDestination, String>> tester = DoFnTester.of(writeTables);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    tester.setSideInput(schemaMapView, GlobalWindow.INSTANCE, ImmutableMap.<String, String>of());
     tester.getPipelineOptions().setTempLocation("tempLocation");
     for (KV<ShardedKey<String>, List<String>> partition : partitions) {
       tester.processElement(partition);
@@ -1999,21 +2057,14 @@ public class BigQueryIOTest implements Serializable {
     final int numTempTablesPerFinalTable = 3;
     final int numRecordsPerTempTable = 10;
 
-    Map<TableDestination, List<TableRow>> expectedRowsPerTable = Maps.newHashMap();
+    Multimap<TableDestination, TableRow> expectedRowsPerTable = ArrayListMultimap.create();
     String jobIdToken = "jobIdToken";
-    Map<TableDestination, Iterable<String>> tempTables = Maps.newHashMap();
+    Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
+    List<KV<TableDestination, String>> tempTablesElement = Lists.newArrayList();
     for (int i = 0; i < numFinalTables; ++i) {
       String tableName = "project-id:dataset-id.table_" + i;
       TableDestination tableDestination = new TableDestination(
           tableName, "table_" + i + "_desc");
-      List<String> tables = Lists.newArrayList();
-      tempTables.put(tableDestination, tables);
-
-      List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
-      if (expectedRows == null) {
-        expectedRows = Lists.newArrayList();
-        expectedRowsPerTable.put(tableDestination, expectedRows);
-      }
       for (int j = 0; i < numTempTablesPerFinalTable; ++i) {
         TableReference tempTable = new TableReference()
             .setProjectId("project-id")
@@ -2026,56 +2077,36 @@ public class BigQueryIOTest implements Serializable {
           rows.add(new TableRow().set("number", j * numTempTablesPerFinalTable + k));
         }
         datasetService.insertAll(tempTable, rows, null);
-        expectedRows.addAll(rows);
-        tables.add(BigQueryHelpers.toJsonString(tempTable));
+        expectedRowsPerTable.putAll(tableDestination, rows);
+        String tableJson = BigQueryHelpers.toJsonString(tempTable);
+        tempTables.put(tableDestination, tableJson);
+        tempTablesElement.add(KV.of(tableDestination, tableJson));
       }
     }
 
-    PCollection<KV<TableDestination, String>> tempTablesPCollection =
-        p.apply(Create.of(tempTables)
-            .withCoder(KvCoder.of(TableDestinationCoder.of(),
-                IterableCoder.of(StringUtf8Coder.of()))))
-            .apply(ParDo.of(new DoFn<KV<TableDestination, Iterable<String>>,
-                KV<TableDestination, String>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                TableDestination tableDestination = c.element().getKey();
-                for (String tempTable : c.element().getValue()) {
-                  c.output(KV.of(tableDestination, tempTable));
-                }
-              }
-            }));
-
-    PCollectionView<Map<TableDestination, Iterable<String>>> tempTablesView =
-        PCollectionViews.multimapView(
-            tempTablesPCollection,
-        WindowingStrategy.globalDefault(),
-        KvCoder.of(TableDestinationCoder.of(),
-            StringUtf8Coder.of()));
 
     PCollectionView<String> jobIdTokenView = p
         .apply("CreateJobId", Create.of("jobId"))
         .apply(View.<String>asSingleton());
 
-    WriteRename writeRename = new WriteRename(
-        fakeBqServices,
-        jobIdTokenView,
-        WriteDisposition.WRITE_EMPTY,
-        CreateDisposition.CREATE_IF_NEEDED,
-        tempTablesView);
+    WriteRename writeRename =
+        new WriteRename(
+            fakeBqServices,
+            jobIdTokenView,
+            WriteDisposition.WRITE_EMPTY,
+            CreateDisposition.CREATE_IF_NEEDED);
 
-    DoFnTester<Void, Void> tester = DoFnTester.of(writeRename);
-    tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables);
+    DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = DoFnTester.of(writeRename);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
-    tester.processElement(null);
+    tester.processElement(tempTablesElement);
 
-    for (Map.Entry<TableDestination, Iterable<String>> entry : tempTables.entrySet()) {
+    for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
       TableDestination tableDestination = entry.getKey();
       TableReference tableReference = tableDestination.getTableReference();
       Table table = checkNotNull(datasetService.getTable(tableReference));
       assertEquals(tableReference.getTableId() + "_desc", tableDestination.getTableDescription());
 
-      List<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
+      Collection<TableRow> expectedRows = expectedRowsPerTable.get(tableDestination);
       assertThat(datasetService.getAllRows(tableReference.getProjectId(),
           tableReference.getDatasetId(), tableReference.getTableId()),
           containsInAnyOrder(Iterables.toArray(expectedRows, TableRow.class)));


[2/2] beam git commit: This closes #3662: [BEAM-2700] Support load jobs in streaming

Posted by jk...@apache.org.
This closes #3662: [BEAM-2700] Support load jobs in streaming


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7e8f886
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7e8f886
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7e8f886

Branch: refs/heads/master
Commit: f7e8f886c91ea9d0b51e00331eeb4484e2f6e000
Parents: 0f8e8dd 075d4d4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Aug 14 14:32:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Aug 14 14:32:14 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 447 +++++++++++++------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 168 ++++++-
 .../sdk/io/gcp/bigquery/ReifyAsIterable.java    |  51 +++
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |  15 +-
 .../sdk/io/gcp/bigquery/WritePartition.java     |  13 +-
 .../beam/sdk/io/gcp/bigquery/WriteRename.java   | 111 +++--
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |  38 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 269 ++++++-----
 8 files changed, 770 insertions(+), 342 deletions(-)
----------------------------------------------------------------------