You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/06/05 18:09:30 UTC

[1/2] beam git commit: Make batch loads repeatable across different invocations of the same template job.

Repository: beam
Updated Branches:
  refs/heads/master 6543e56d2 -> ad2c1f1fc


Make batch loads repeatable across different invocations of the same template job.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/94d8547d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/94d8547d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/94d8547d

Branch: refs/heads/master
Commit: 94d8547d604dc7291cb8a72a12453a3dfe6a5f9a
Parents: 6543e56
Author: Reuven Lax <re...@google.com>
Authored: Sun Apr 23 12:53:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 5 10:54:53 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 50 +++++++++++---------
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 12 ++---
 2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index c1b202e..4393a63 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -60,10 +60,14 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
 class BatchLoads<DestinationT>
     extends PTransform<PCollection<KV<DestinationT, TableRow>>, WriteResult> {
+  static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);
+
   // The maximum number of file writers to keep open in a single bundle at a time, since file
   // writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
   // The first 20 tables from a single BatchLoads transform will write files inline in the
@@ -161,28 +165,10 @@ class BatchLoads<DestinationT>
   @Override
   public WriteResult expand(PCollection<KV<DestinationT, TableRow>> input) {
     Pipeline p = input.getPipeline();
-    final String stepUuid = BigQueryHelpers.randomUUIDString();
-
-    PCollectionView<String> tempFilePrefix =
-        p.apply("Create", Create.of((Void) null))
-            .apply(
-                "GetTempFilePrefix",
-                ParDo.of(
-                    new DoFn<Void, String>() {
-                      @ProcessElement
-                      public void getTempFilePrefix(ProcessContext c) {
-                        c.output(
-                            resolveTempLocation(
-                                c.getPipelineOptions().getTempLocation(),
-                                "BigQueryWriteTemp",
-                                stepUuid));
-                      }
-                    }))
-            .apply("TempFilePrefixView", View.<String>asSingleton());
 
     // Create a singleton job ID token at execution time. This will be used as the base for all
     // load jobs issued from this instance of the transform.
-    PCollectionView<String> jobIdTokenView =
+    final PCollection<String> jobIdToken =
         p.apply("TriggerIdCreation", Create.of("ignored"))
             .apply(
                 "CreateJobId",
@@ -190,10 +176,27 @@ class BatchLoads<DestinationT>
                     new SimpleFunction<String, String>() {
                       @Override
                       public String apply(String input) {
-                        return stepUuid;
+                        return BigQueryHelpers.randomUUIDString();
+                      }
+                    }));
+    final PCollectionView<String> jobIdTokenView = jobIdToken.apply(View.<String>asSingleton());
+
+    PCollectionView<String> tempFilePrefix = jobIdToken
+            .apply(
+                "GetTempFilePrefix",
+                ParDo.of(
+                    new DoFn<String, String>() {
+                      @ProcessElement
+                      public void getTempFilePrefix(ProcessContext c) {
+                        String tempLocation = resolveTempLocation(
+                            c.getPipelineOptions().getTempLocation(),
+                            "BigQueryWriteTemp", c.element());
+                        LOG.info("Writing BigQuery temporary files to {} before loading them.",
+                            tempLocation);
+                        c.output(tempLocation);
                       }
                     }))
-            .apply(View.<String>asSingleton());
+            .apply("TempFilePrefixView", View.<String>asSingleton());
 
     PCollection<KV<DestinationT, TableRow>> inputInGlobalWindow =
         input.apply(
@@ -210,9 +213,10 @@ class BatchLoads<DestinationT>
         new TupleTag<KV<ShardedKey<DestinationT>, TableRow>>("unwrittenRecords") {};
     PCollectionTuple writeBundlesTuple = inputInGlobalWindow
             .apply("WriteBundlesToFiles",
-                ParDo.of(new WriteBundlesToFiles<>(stepUuid, unwrittedRecordsTag,
+                ParDo.of(new WriteBundlesToFiles<>(tempFilePrefix, unwrittedRecordsTag,
                     maxNumWritersPerBundle, maxFileSize))
-                .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
+                    .withSideInputs(tempFilePrefix)
+                    .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
     PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
         writeBundlesTuple.get(writtenFilesTag)
         .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));

http://git-wip-us.apache.org/repos/asf/beam/blob/94d8547d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
index 0b5f54b..d68779a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteBundlesToFiles.java
@@ -19,8 +19,6 @@
 package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
-
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -41,6 +39,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +63,7 @@ class WriteBundlesToFiles<DestinationT>
   // Map from tablespec to a writer for that table.
   private transient Map<DestinationT, TableRowWriter> writers;
   private transient Map<DestinationT, BoundedWindow> writerWindows;
-  private final String stepUuid;
+  private final PCollectionView<String> tempFilePrefixView;
   private final TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag;
   private int maxNumWritersPerBundle;
   private long maxFileSize;
@@ -131,11 +130,11 @@ class WriteBundlesToFiles<DestinationT>
   }
 
   WriteBundlesToFiles(
-      String stepUuid,
+      PCollectionView<String> tempFilePrefixView,
       TupleTag<KV<ShardedKey<DestinationT>, TableRow>> unwrittedRecordsTag,
       int maxNumWritersPerBundle,
       long maxFileSize) {
-    this.stepUuid = stepUuid;
+    this.tempFilePrefixView = tempFilePrefixView;
     this.unwrittedRecordsTag = unwrittedRecordsTag;
     this.maxNumWritersPerBundle = maxNumWritersPerBundle;
     this.maxFileSize = maxFileSize;
@@ -159,8 +158,7 @@ class WriteBundlesToFiles<DestinationT>
 
   @ProcessElement
   public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-    String tempFilePrefix = resolveTempLocation(
-        c.getPipelineOptions().getTempLocation(), "BigQueryWriteTemp", stepUuid);
+    String tempFilePrefix = c.sideInput(tempFilePrefixView);
     DestinationT destination = c.element().getKey();
 
     TableRowWriter writer;


[2/2] beam git commit: This closes #2657: [BEAM-2058] Generate BigQuery load job at runtime

Posted by jk...@apache.org.
This closes #2657: [BEAM-2058] Generate BigQuery load job at runtime


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ad2c1f1f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ad2c1f1f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ad2c1f1f

Branch: refs/heads/master
Commit: ad2c1f1fc0fa3b65b8d5befae12eaf983f69cb2c
Parents: 6543e56 94d8547
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Jun 5 10:56:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Jun 5 10:56:01 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 50 +++++++++++---------
 .../io/gcp/bigquery/WriteBundlesToFiles.java    | 12 ++---
 2 files changed, 32 insertions(+), 30 deletions(-)
----------------------------------------------------------------------