You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/16 22:19:37 UTC

[1/2] beam git commit: Add custom tempLocation support to BigQueryIO.

Repository: beam
Updated Branches:
  refs/heads/master ec052bb44 -> a5cbd764b


Add custom tempLocation support to BigQueryIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec58a80c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec58a80c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec58a80c

Branch: refs/heads/master
Commit: ec58a80ca0f913c85d5f17cba3535243cd010876
Parents: ec052bb
Author: Yunqing Zhou <zh...@zhouyunqing-macbookpro.roam.corp.google.com>
Authored: Fri Oct 13 15:52:18 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 16 15:17:59 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 25 ++++++++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 49 ++++++++++++++++----
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 6d832e4..1ccd5d6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -41,6 +41,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -127,11 +128,13 @@ class BatchLoads<DestinationT>
   private long maxFileSize;
   private int numFileShards;
   private Duration triggeringFrequency;
+  private ValueProvider<String> customGcsTempLocation;
 
   BatchLoads(WriteDisposition writeDisposition, CreateDisposition createDisposition,
              boolean singletonTable,
              DynamicDestinations<?, DestinationT> dynamicDestinations,
-             Coder<DestinationT> destinationCoder) {
+             Coder<DestinationT> destinationCoder,
+             ValueProvider<String> customGcsTempLocation) {
     bigQueryServices = new BigQueryServicesImpl();
     this.writeDisposition = writeDisposition;
     this.createDisposition = createDisposition;
@@ -142,6 +145,7 @@ class BatchLoads<DestinationT>
     this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
     this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
     this.triggeringFrequency = null;
+    this.customGcsTempLocation = customGcsTempLocation;
   }
 
   void setTestServices(BigQueryServices bigQueryServices) {
@@ -174,7 +178,16 @@ class BatchLoads<DestinationT>
   @Override
   public void validate(PipelineOptions options) {
     // We will use a BigQuery load job -- validate the temp location.
-    String tempLocation = options.getTempLocation();
+    String tempLocation;
+    if (customGcsTempLocation == null) {
+      tempLocation = options.getTempLocation();
+    } else {
+      if (!customGcsTempLocation.isAccessible()) {
+        // Can't perform verification in this case.
+        return;
+      }
+      tempLocation = customGcsTempLocation.get();
+    }
     checkArgument(
         !Strings.isNullOrEmpty(tempLocation),
         "BigQueryIO.Write needs a GCS temp location to store temp files.");
@@ -359,9 +372,15 @@ class BatchLoads<DestinationT>
                 new DoFn<String, String>() {
                   @ProcessElement
                   public void getTempFilePrefix(ProcessContext c) {
+                    String tempLocationRoot;
+                    if (customGcsTempLocation != null) {
+                      tempLocationRoot = customGcsTempLocation.get();
+                    } else {
+                      tempLocationRoot = c.getPipelineOptions().getTempLocation();
+                    }
                     String tempLocation =
                         resolveTempLocation(
-                            c.getPipelineOptions().getTempLocation(),
+                            tempLocationRoot,
                             "BigQueryWriteTemp",
                             c.element());
                     LOG.info(

http://git-wip-us.apache.org/repos/asf/beam/blob/ec58a80c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2f99643..3dfd8b8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -240,15 +240,6 @@ import org.slf4j.LoggerFactory;
  * <p>For the most general form of dynamic table destinations and schemas, look at {@link
  * BigQueryIO.Write#to(DynamicDestinations)}.
  *
- * <h3>Permissions</h3>
- *
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
- * details.
- *
- * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
- * </a> for security and permission related information specific to BigQuery.
- *
  * <h3>Insertion Method</h3>
  *
  * {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using
@@ -257,6 +248,30 @@ import org.slf4j.LoggerFactory;
  * about the methods. The different insertion methods provide different tradeoffs of cost, quota,
  * and data consistency; please see BigQuery documentation for more information about these
  * tradeoffs.
+ *
+ * <h3>Usage with templates</h3>
+ *
+ * <p>When using {@link #read} or {@link #readTableRows()} in a template, it's required to specify
+ * {@link Read#withTemplateCompatibility()}. Specifying this in a non-template pipeline is not
+ * recommended because it has somewhat lower performance.
+ *
+ * <p>When using {@link #write()} or {@link #writeTableRows()} with batch loads in a template, it is
+ * recommended to specify {@link Write#withCustomGcsTempLocation}. Writing to BigQuery via batch
+ * loads involves writing temporary files to this location, so the location must be accessible at
+ * pipeline execution time. By default, this location is captured at pipeline <i>construction</i>
+ * time, may be inaccessible if the template may be reused from a different project or at a moment
+ * when the original location no longer exists.
+ * {@link Write#withCustomGcsTempLocation(ValueProvider)} allows specifying the location as an
+ * argument to the template invocation.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
+ * </a> for security and permission related information specific to BigQuery.
  */
 public class BigQueryIO {
   private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
@@ -1031,6 +1046,8 @@ public class BigQueryIO {
 
     @Nullable abstract InsertRetryPolicy getFailedInsertRetryPolicy();
 
+    @Nullable abstract ValueProvider<String> getCustomGcsTempLocation();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -1059,6 +1076,8 @@ public class BigQueryIO {
 
       abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);
 
+      abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation);
+
       abstract Write<T> build();
     }
 
@@ -1303,6 +1322,15 @@ public class BigQueryIO {
       return toBuilder().setNumFileShards(numFileShards).build();
     }
 
+    /**
+     * Provides a custom location on GCS for storing temporary files to be loaded via BigQuery
+     * batch load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for
+     * discussion.
+     */
+    public Write<T> withCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation) {
+      return toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build();
+    }
+
     @VisibleForTesting
     Write<T> withTestServices(BigQueryServices testServices) {
       return toBuilder().setBigQueryServices(testServices).build();
@@ -1479,7 +1507,8 @@ public class BigQueryIO {
             getCreateDisposition(),
             getJsonTableRef() != null,
             dynamicDestinations,
-            destinationCoder);
+            destinationCoder,
+            getCustomGcsTempLocation());
         batchLoads.setTestServices(getBigQueryServices());
         if (getMaxFilesPerBundle() != null) {
           batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());


[2/2] beam git commit: This closes #3995: Add custom gcs temp location support to BigQueryIO

Posted by jk...@apache.org.
This closes #3995: Add custom gcs temp location support to BigQueryIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a5cbd764
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a5cbd764
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a5cbd764

Branch: refs/heads/master
Commit: a5cbd764bb81ca2aefe225d62e15e222a2266f0b
Parents: ec052bb ec58a80
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Oct 16 15:18:31 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon Oct 16 15:18:31 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    | 25 ++++++++--
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 49 ++++++++++++++++----
 2 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------