You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/26 06:10:36 UTC

[1/2] incubator-beam git commit: [BEAM-377] Validate BigQueryIO.Read is properly configured

Repository: incubator-beam
Updated Branches:
  refs/heads/master ab74eac34 -> 4f580f5f1


[BEAM-377] Validate BigQueryIO.Read is properly configured

Previously, using withoutValidation would disable all validation,
leading to a NullPointerException if there wasn't a table or schema
provided.

The intention of the withoutValidation parameter is to bypass more
expensive (and possibly incorrect checks, such as the existence of
the table prior to pipeline execution in cases where earlier stages
create the table).

This moves the basic usage validation to always happen, while the
extended validation is still disabled by withoutValidation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7613b9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7613b9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7613b9b

Branch: refs/heads/master
Commit: d7613b9bbb8782a959c12453a5f28dcefeecb102
Parents: ab74eac
Author: Ben Chambers <bc...@google.com>
Authored: Sat Jun 25 14:11:17 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Sat Jun 25 23:10:25 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 47 ++++++++++++--------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 31 ++++++++++++-
 2 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 1c666ed..6a36c8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -389,6 +389,12 @@ public class BigQueryIO {
     public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
       @Nullable final String jsonTableRef;
       @Nullable final String query;
+
+      /**
+       * Disable validation that the table exists or the query succeeds prior to pipeline
+       * submission. Basic validation (such as ensuring that a query or table is specified) still
+       * occurs.
+       */
       final boolean validate;
       @Nullable final Boolean flattenResults;
       @Nullable final BigQueryServices testBigQueryServices;
@@ -467,7 +473,9 @@ public class BigQueryIO {
       }
 
       /**
-       * Disable table validation.
+       * Disable validation that the table exists or the query succeeds prior to pipeline
+       * submission. Basic validation (such as ensuring that a query or table is specified) still
+       * occurs.
        */
       public Bound withoutValidation() {
         return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
@@ -491,24 +499,27 @@ public class BigQueryIO {
 
       @Override
       public void validate(PInput input) {
-        if (validate) {
-          BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-          TableReference table = getTableWithDefaultProject(bqOptions);
-          if (table == null && query == null) {
-            throw new IllegalStateException(
-                "Invalid BigQuery read operation, either table reference or query has to be set");
-          } else if (table != null && query != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
-                + " query and a table, only one of these should be provided");
-          } else if (table != null && flattenResults != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-                + " table with a result flattening preference, which is not configurable");
-          } else if (query != null && flattenResults == null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-                + " query without a result flattening preference");
-          }
+        // Even if existence validation is disabled, we need to make sure that the BigQueryIO
+        // read is properly specified.
+        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+        TableReference table = getTableWithDefaultProject(bqOptions);
+        if (table == null && query == null) {
+          throw new IllegalStateException(
+              "Invalid BigQuery read operation, either table reference or query has to be set");
+        } else if (table != null && query != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+              + " query and a table, only one of these should be provided");
+        } else if (table != null && flattenResults != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+        } else if (query != null && flattenResults == null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+              + " query without a result flattening preference");
+        }
 
+        // Only verify existence/correctness if validation is enabled.
+        if (validate) {
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier
           // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7613b9b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 2a135ec..a1daf72 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -26,8 +26,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline;
@@ -473,6 +473,17 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
+  public void testBuildSourceWithoutTableQueryOrValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation, either table reference or query has to be set");
+    p.apply(BigQueryIO.Read.withoutValidation());
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndQuery() {
     Pipeline p = TestPipeline.create();
     thrown.expect(IllegalStateException.class);
@@ -502,6 +513,22 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndFlattenWithoutValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+    p.apply(
+        BigQueryIO.Read.named("ReadMyTable")
+            .from("foo.com:project:somedataset.sometable")
+            .withoutValidation()
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadFromTable() {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()


[2/2] incubator-beam git commit: Closes #535

Posted by dh...@apache.org.
Closes #535


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f580f5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f580f5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f580f5f

Branch: refs/heads/master
Commit: 4f580f5f1b0ccc7a007c2c188e1bdc467390f484
Parents: ab74eac d7613b9
Author: Dan Halperin <dh...@google.com>
Authored: Sat Jun 25 23:10:26 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Sat Jun 25 23:10:26 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 47 ++++++++++++--------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 31 ++++++++++++-
 2 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------