You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:59 UTC

[05/10] beam git commit: Use AutoValue for BigQueryIO.Read

Use AutoValue for BigQueryIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32cba321
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32cba321
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32cba321

Branch: refs/heads/master
Commit: 32cba321c04c5e3fb18856c84ea10b3513264dd5
Parents: d6b3e11
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:51:04 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 215 +++++++------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  21 +-
 2 files changed, 84 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 90d7f67..e5db60e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -36,9 +36,11 @@ import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -453,97 +455,86 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
-    @Nullable final ValueProvider<String> jsonTableRef;
-    @Nullable final ValueProvider<String> query;
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+    @Nullable abstract ValueProvider<String> getJsonTableRef();
+    @Nullable abstract ValueProvider<String> getQuery();
+    abstract boolean getValidate();
+    @Nullable abstract Boolean getFlattenResults();
+    @Nullable abstract Boolean getUseLegacySql();
+    @Nullable abstract BigQueryServices getBigQueryServices();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+      abstract Builder setQuery(ValueProvider<String> query);
+      abstract Builder setValidate(boolean validate);
+      abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
+      abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
+      abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+
+      abstract Read build();
+    }
+
+    private static Builder builder() {
+      return new AutoValue_BigQueryIO_Read.Builder()
+          .setValidate(true)
+          .setBigQueryServices(new BigQueryServicesImpl());
+    }
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
     public static Read from(String tableSpec) {
-      return new Read().from(StaticValueProvider.of(tableSpec));
+      return from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
     public static Read from(ValueProvider<String> tableSpec) {
-      return new Read().from(tableSpec);
+      return builder()
+          .setJsonTableRef(
+              NestedValueProvider.of(
+                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+                  new TableRefToJson())).build();
     }
 
     /**
      * Reads results received after executing the given query.
      */
     public static Read fromQuery(String query) {
-      return new Read().fromQuery(StaticValueProvider.of(query));
+      return fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
     public static Read fromQuery(ValueProvider<String> query) {
-      return new Read().fromQuery(query);
+      return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
     public static Read from(TableReference table) {
-      return new Read().from(table);
+      return from(StaticValueProvider.of(toTableSpec(table)));
     }
 
-    /**
-     * Disable validation that the table exists or the query succeeds prior to pipeline
-     * submission. Basic validation (such as ensuring that a query or table is specified) still
-     * occurs.
-     */
-    final boolean validate;
-    @Nullable final Boolean flattenResults;
-    @Nullable final Boolean useLegacySql;
-    @Nullable BigQueryServices bigQueryServices;
-
-    @VisibleForTesting @Nullable String stepUuid;
-    @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
     private static final String QUERY_VALIDATION_FAILURE_ERROR =
         "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
         + " pipeline, This validation can be disabled using #withoutValidation.";
 
-    private Read() {
-      this(
-          null /* name */,
-          null /* query */,
-          null /* jsonTableRef */,
-          true /* validate */,
-          null /* flattenResults */,
-          null /* useLegacySql */,
-          null /* bigQueryServices */);
-    }
-
-    private Read(
-        String name, @Nullable ValueProvider<String> query,
-        @Nullable ValueProvider<String> jsonTableRef, boolean validate,
-        @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
-        @Nullable BigQueryServices bigQueryServices) {
-      super(name);
-      this.jsonTableRef = jsonTableRef;
-      this.query = query;
-      this.validate = validate;
-      this.flattenResults = flattenResults;
-      this.useLegacySql = useLegacySql;
-      this.bigQueryServices = bigQueryServices;
-    }
-
     /**
      * Disable validation that the table exists or the query succeeds prior to pipeline
      * submission. Basic validation (such as ensuring that a query or table is specified) still
      * occurs.
      */
     public Read withoutValidation() {
-      return new Read(
-          name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
-          bigQueryServices);
+      return toBuilder().setValidate(false).build();
     }
 
     /**
@@ -554,9 +545,7 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read withoutResultFlattening() {
-      return new Read(
-          name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
-          bigQueryServices);
+      return toBuilder().setFlattenResults(false).build();
     }
 
     /**
@@ -566,15 +555,12 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read usingStandardSql() {
-      return new Read(
-          name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
-          bigQueryServices);
+      return toBuilder().setUseLegacySql(false).build();
     }
 
     @VisibleForTesting
     Read withTestServices(BigQueryServices testServices) {
-      return new Read(
-          name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+      return toBuilder().setBigQueryServices(testServices).build();
     }
 
     @Override
@@ -587,7 +573,7 @@ public class BigQueryIO {
       checkArgument(
           !Strings.isNullOrEmpty(tempLocation),
           "BigQueryIO.Read needs a GCS temp location to store temp files.");
-      if (bigQueryServices == null) {
+      if (getBigQueryServices() == null) {
         try {
           GcsPath.fromUri(tempLocation);
         } catch (IllegalArgumentException e) {
@@ -602,63 +588,65 @@ public class BigQueryIO {
       ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
 
       checkState(
-          table == null || query == null,
+          table == null || getQuery() == null,
           "Invalid BigQueryIO.Read: table reference and query may not both be set");
       checkState(
-          table != null || query != null,
+          table != null || getQuery() != null,
           "Invalid BigQueryIO.Read: one of table reference and query must be set");
 
       if (table != null) {
         checkState(
-            flattenResults == null,
+            getFlattenResults() == null,
             "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
                 + " preference, which only applies to queries");
         checkState(
-            useLegacySql == null,
+            getUseLegacySql() == null,
             "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
                 + " preference, which only applies to queries");
       } else /* query != null */ {
-        checkState(flattenResults != null, "flattenResults should not be null if query is set");
-        checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+        checkState(
+            getFlattenResults() != null, "flattenResults should not be null if query is set");
+        checkState(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
       }
 
       // Note that a table or query check can fail if the table or dataset are created by
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
-      if (validate && table != null) {
+      if (getValidate() && table != null) {
         checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
         // Check for source table presence for early failure notification.
         DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
         verifyDatasetPresence(datasetService, table.get());
         verifyTablePresence(datasetService, table.get());
-      } else if (validate && query != null) {
-        checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+      } else if (getValidate() && getQuery() != null) {
+        checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
         JobService jobService = getBigQueryServices().getJobService(bqOptions);
         try {
           jobService.dryRunQuery(
               bqOptions.getProject(),
               new JobConfigurationQuery()
-                  .setQuery(query.get())
-                  .setFlattenResults(flattenResults)
-                  .setUseLegacySql(useLegacySql));
+                  .setQuery(getQuery().get())
+                  .setFlattenResults(getFlattenResults())
+                  .setUseLegacySql(getUseLegacySql()));
         } catch (Exception e) {
           throw new IllegalArgumentException(
-              String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
+              String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
         }
       }
     }
 
     @Override
     public PCollection<TableRow> expand(PBegin input) {
-      stepUuid = randomUUIDString();
+      String stepUuid = randomUUIDString();
       BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-      jobUuid = NestedValueProvider.of(
+      ValueProvider<String> jobUuid = NestedValueProvider.of(
          StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
       final ValueProvider<String> jobIdToken = NestedValueProvider.of(
           jobUuid, new BeamJobUuidToBigQueryJobUuid());
 
       BoundedSource<TableRow> source;
-      final BigQueryServices bqServices = getBigQueryServices();
+      final BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       final String extractDestinationDir;
       String tempLocation = bqOptions.getTempLocation();
@@ -671,11 +659,18 @@ public class BigQueryIO {
       }
 
       final String executingProject = bqOptions.getProject();
-      if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
-        source = BigQueryQuerySource.create(
-            jobIdToken, query, NestedValueProvider.of(
-              jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
-            flattenResults, useLegacySql, extractDestinationDir, bqServices);
+      if (getQuery() != null
+          && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
+        source =
+            BigQueryQuerySource.create(
+                jobIdToken,
+                getQuery(),
+                NestedValueProvider.of(
+                    jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+                getFlattenResults(),
+                getUseLegacySql(),
+                extractDestinationDir,
+                bqServices);
       } else {
         ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
         source = BigQueryTableSource.create(
@@ -726,13 +721,13 @@ public class BigQueryIO {
       builder
           .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
             .withLabel("Table"))
-          .addIfNotNull(DisplayData.item("query", query)
+          .addIfNotNull(DisplayData.item("query", getQuery())
             .withLabel("Query"))
-          .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+          .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults())
             .withLabel("Flatten Query Results"))
-          .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+          .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql())
             .withLabel("Use Legacy SQL Dialect"))
-          .addIfNotDefault(DisplayData.item("validation", validate)
+          .addIfNotDefault(DisplayData.item("validation", getValidate())
             .withLabel("Validation Enabled"),
               true);
     }
@@ -769,8 +764,8 @@ public class BigQueryIO {
      */
     @Nullable
     public ValueProvider<TableReference> getTableProvider() {
-      return jsonTableRef == null
-          ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+      return getJsonTableRef() == null
+          ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
     /**
      * Returns the table to read, or {@code null} if reading from a query instead.
@@ -780,52 +775,6 @@ public class BigQueryIO {
       ValueProvider<TableReference> provider = getTableProvider();
       return provider == null ? null : provider.get();
     }
-
-    /**
-     * Returns the query to be read, or {@code null} if reading from a table instead.
-     */
-    @Nullable
-    public String getQuery() {
-      return query == null ? null : query.get();
-    }
-
-    /**
-     * Returns the query to be read, or {@code null} if reading from a table instead.
-     */
-    @Nullable
-    public ValueProvider<String> getQueryProvider() {
-      return query;
-    }
-
-    /**
-     * Returns true if table validation is enabled.
-     */
-    public boolean getValidate() {
-      return validate;
-    }
-
-    /**
-     * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
-     */
-    public Boolean getFlattenResults() {
-      return flattenResults;
-    }
-
-    /**
-     * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
-     * if not applicable.
-     */
-    @Nullable
-    public Boolean getUseLegacySql() {
-      return useLegacySql;
-    }
-
-    private BigQueryServices getBigQueryServices() {
-      if (bigQueryServices == null) {
-        bigQueryServices = new BigQueryServicesImpl();
-      }
-      return bigQueryServices;
-    }
   }
 
   /**
@@ -1863,7 +1812,7 @@ public class BigQueryIO {
 
       ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-      String stepUuid = randomUUIDString();
+      final String stepUuid = randomUUIDString();
 
       String tempLocation = options.getTempLocation();
       String tempFilePrefix;
@@ -1886,7 +1835,7 @@ public class BigQueryIO {
               new SimpleFunction<String, String>() {
                 @Override
                 public String apply(String input) {
-                  return randomUUIDString();
+                  return stepUuid;
                 }
               }))
           .apply(View.<String>asSingleton());

http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f403c5a..fdaa81c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -678,14 +677,14 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(project, read.getTable().getProjectId());
     assertEquals(dataset, read.getTable().getDatasetId());
     assertEquals(table, read.getTable().getTableId());
-    assertNull(read.query);
+    assertNull(read.getQuery());
     assertEquals(validate, read.getValidate());
   }
 
   private void checkReadQueryObjectWithValidate(
       BigQueryIO.Read read, String query, boolean validate) {
     assertNull(read.getTable());
-    assertEquals(query, read.getQuery());
+    assertEquals(query, read.getQuery().get());
     assertEquals(validate, read.getValidate());
   }
 
@@ -2433,20 +2432,4 @@ public class BigQueryIOTest implements Serializable {
                 BigQueryIO.TableRowInfoCoder.of()),
             IntervalWindow.getCoder()));
   }
-
-  @Test
-  public void testUniqueStepIdRead() {
-    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    Pipeline pipeline = TestPipeline.create(options);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-    BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
-        options.getInputQuery()).withoutValidation();
-    pipeline.apply(read1);
-    BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
-        options.getInputQuery()).withoutValidation();
-    pipeline.apply(read2);
-    assertNotEquals(read1.stepUuid, read2.stepUuid);
-    assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
-  }
 }