You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:17 UTC

[11/50] [abbrv] incubator-beam git commit: [BEAM-377] Validate BigQueryIO.Read is properly configured

[BEAM-377] Validate BigQueryIO.Read is properly configured

Previously, using withoutValidation would disable all validation,
leading to a NullPointerException if there wasn't a table or schema
provided.

The intention of the withoutValidation parameter is to bypass more
expensive (and possibly incorrect checks, such as the existence of
the table prior to pipeline execution in cases where earlier stages
create the table).

This moves the basic usage validation to always happen, while the
extended validation is still disabled by withoutValidation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a25322e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a25322e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a25322e1

Branch: refs/heads/runners-spark2
Commit: a25322e1bc217e1e185dc2beed72e28d1f5a9cc1
Parents: 38dfb63
Author: Ben Chambers <bc...@google.com>
Authored: Sat Jun 25 14:11:17 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 47 ++++++++++++--------
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 31 ++++++++++++-
 2 files changed, 58 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a25322e1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 1c666ed..6a36c8d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -389,6 +389,12 @@ public class BigQueryIO {
     public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
       @Nullable final String jsonTableRef;
       @Nullable final String query;
+
+      /**
+       * Disable validation that the table exists or the query succeeds prior to pipeline
+       * submission. Basic validation (such as ensuring that a query or table is specified) still
+       * occurs.
+       */
       final boolean validate;
       @Nullable final Boolean flattenResults;
       @Nullable final BigQueryServices testBigQueryServices;
@@ -467,7 +473,9 @@ public class BigQueryIO {
       }
 
       /**
-       * Disable table validation.
+       * Disable validation that the table exists or the query succeeds prior to pipeline
+       * submission. Basic validation (such as ensuring that a query or table is specified) still
+       * occurs.
        */
       public Bound withoutValidation() {
         return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
@@ -491,24 +499,27 @@ public class BigQueryIO {
 
       @Override
       public void validate(PInput input) {
-        if (validate) {
-          BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-          TableReference table = getTableWithDefaultProject(bqOptions);
-          if (table == null && query == null) {
-            throw new IllegalStateException(
-                "Invalid BigQuery read operation, either table reference or query has to be set");
-          } else if (table != null && query != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
-                + " query and a table, only one of these should be provided");
-          } else if (table != null && flattenResults != null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-                + " table with a result flattening preference, which is not configurable");
-          } else if (query != null && flattenResults == null) {
-            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-                + " query without a result flattening preference");
-          }
+        // Even if existence validation is disabled, we need to make sure that the BigQueryIO
+        // read is properly specified.
+        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+        TableReference table = getTableWithDefaultProject(bqOptions);
+        if (table == null && query == null) {
+          throw new IllegalStateException(
+              "Invalid BigQuery read operation, either table reference or query has to be set");
+        } else if (table != null && query != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+              + " query and a table, only one of these should be provided");
+        } else if (table != null && flattenResults != null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+        } else if (query != null && flattenResults == null) {
+          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+              + " query without a result flattening preference");
+        }
 
+        // Only verify existence/correctness if validation is enabled.
+        if (validate) {
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier
           // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a25322e1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index 2a135ec..a1daf72 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -26,8 +26,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.eq;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import org.apache.beam.sdk.Pipeline;
@@ -473,6 +473,17 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(RunnableOnService.class)
+  public void testBuildSourceWithoutTableQueryOrValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation, either table reference or query has to be set");
+    p.apply(BigQueryIO.Read.withoutValidation());
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
   public void testBuildSourceWithTableAndQuery() {
     Pipeline p = TestPipeline.create();
     thrown.expect(IllegalStateException.class);
@@ -502,6 +513,22 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndFlattenWithoutValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+    p.apply(
+        BigQueryIO.Read.named("ReadMyTable")
+            .from("foo.com:project:somedataset.sometable")
+            .withoutValidation()
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testReadFromTable() {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()