You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/18 21:35:29 UTC

[2/2] incubator-beam git commit: [BEAM-50] BigQueryIO: move write validation to validate() from apply()

[BEAM-50] BigQueryIO: move write validation to validate() from apply()

If runners override the transform, validation will still happen.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3fe10227
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3fe10227
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3fe10227

Branch: refs/heads/master
Commit: 3fe10227935e1b2a5d8af82338cb8d635f1b564e
Parents: becc1ef
Author: Dan Halperin <dh...@google.com>
Authored: Sat Apr 16 12:19:01 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Apr 18 12:34:23 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 52 ++++++++++++--------
 1 file changed, 31 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3fe10227/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index eb99637..aff1e4c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -441,9 +441,6 @@ public class BigQueryIO {
         return new Bound(name, query, table, validate, false);
       }
 
-      /**
-       * Validates the current {@link PTransform}.
-       */
       @Override
       public void validate(PInput input) {
         if (table == null && query == null) {
@@ -931,7 +928,7 @@ public class BigQueryIO {
       }
 
       @Override
-      public PDone apply(PCollection<TableRow> input) {
+      public void validate(PCollection<TableRow> input) {
         BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
         TableReference table = getTable();
@@ -963,7 +960,7 @@ public class BigQueryIO {
         // Note that a presence check can fail if the table or dataset are created by earlier stages
         // of the pipeline. For these cases the withoutValidation method can be used to disable
         // the check.
-        // Unfortunately we can't validate anything early in case tableRefFunction is specified.
+        // Unfortunately we can't validate anything early if tableRefFunction is specified.
         if (table != null && validate) {
           verifyDatasetPresence(options, table);
           if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
@@ -974,9 +971,8 @@ public class BigQueryIO {
           }
         }
 
-        // In streaming, BigQuery write is taken care of by StreamWithDeDup transform.
-        // We also currently do this if a tablespec function is specified.
         if (options.isStreaming() || tableRefFunction != null) {
+          // We will use BigQuery's streaming write API -- validate support dispositions.
           if (createDisposition == CreateDisposition.CREATE_NEVER) {
             throw new IllegalArgumentException("CreateDispostion.CREATE_NEVER is not "
                 + "supported for unbounded PCollections or when using tablespec functions.");
@@ -986,24 +982,38 @@ public class BigQueryIO {
             throw new IllegalArgumentException("WriteDisposition.WRITE_TRUNCATE is not "
                 + "supported for unbounded PCollections or when using tablespec functions.");
           }
-
-          return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema()));
+        } else {
+          // We will use a BigQuery load job -- validate the temp location.
+          String tempLocation = options.getTempLocation();
+          checkArgument(
+              !Strings.isNullOrEmpty(tempLocation),
+              "BigQueryIO.Write needs a GCS temp location to store temp files.");
+          if (testBigQueryServices == null) {
+            try {
+              GcsPath.fromUri(tempLocation);
+            } catch (IllegalArgumentException e) {
+              throw new IllegalArgumentException(
+                  String.format(
+                      "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                      tempLocation),
+                  e);
+            }
+          }
         }
+      }
 
-        String tempLocation = options.getTempLocation();
-        checkArgument(!Strings.isNullOrEmpty(tempLocation),
-            "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (testBigQueryServices == null) {
-          try {
-            GcsPath.fromUri(tempLocation);
-          } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException(String.format(
-                "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                tempLocation), e);
-          }
+      @Override
+      public PDone apply(PCollection<TableRow> input) {
+        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+        // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
+        // and BigQuery's streaming import API.
+        if (options.isStreaming() || tableRefFunction != null) {
+          return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
         }
+
         String jobIdToken = UUID.randomUUID().toString();
-        String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken;
+        String tempFilePrefix = options.getTempLocation() + "/BigQuerySinkTemp/" + jobIdToken;
         BigQueryServices bqServices = getBigQueryServices();
         return input.apply("Write", org.apache.beam.sdk.io.Write.to(
             new BigQuerySink(