You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:02 UTC

[08/10] beam git commit: Replace BigQueryIO.Read.from() with BigQueryIO.read().from()

Replace BigQueryIO.Read.from() with BigQueryIO.read().from()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a252a77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a252a77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a252a77

Branch: refs/heads/master
Commit: 1a252a771127febe551fda5d499c7ecb3b95cf23
Parents: 1adcbae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:15:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:36 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../beam/examples/cookbook/JoinExamples.java    |  4 +--
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../org/apache/beam/sdk/io/package-info.java    |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 36 +++++++++++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 36 ++++++++++----------
 8 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 079674a..d3c9167 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -156,7 +156,7 @@ public class BigQueryTornadoes {
     fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new CountTornadoes())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 37f9d79..fc54b13 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -200,7 +200,7 @@ public class CombinePerKeyExamples {
     fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new PlaysForWord())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index fb6b507..714a8f2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -238,7 +238,7 @@ public class FilterExamples {
 
     TableSchema schema = buildWeatherSchemaProjection();
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(ParDo.of(new ProjectionFn()))
      .apply(new BelowGlobalMean(options.getMonthFilter()))
      .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 7cf0942..05a3ad3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -166,8 +166,8 @@ public class JoinExamples {
     Pipeline p = Pipeline.create(options);
     // the following two 'applys' create multiple inputs to our pipeline, one for each
     // of our two input sources.
-    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
     PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
     formattedResults.apply(TextIO.Write.to(options.getOutput()));
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index eabc42b..7e7bc72 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -149,7 +149,7 @@ public class MaxPerKeyExamples {
     fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new MaxMeanTemp())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c4ff158..c65d7dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -24,7 +24,7 @@
  * from existing storage:
  * <pre>{@code
  * PCollection<TableRow> inputData = pipeline.apply(
- *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e039c8c..dfb7ea6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
  * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
  * <pre>{@code
  * PCollection<TableRow> weatherData = pipeline.apply(
- *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  *
  * <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -177,7 +177,7 @@ import org.slf4j.LoggerFactory;
  *
  * <pre>{@code
  * PCollection<TableRow> meanTemperatureData = pipeline.apply(
- *     BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
+ *     BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
  * }</pre>
  *
  * <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -454,6 +454,14 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
+  public static Read read() {
+    return new AutoValue_BigQueryIO_Read.Builder()
+        .setValidate(true)
+        .setBigQueryServices(new BigQueryServicesImpl())
+        .build();
+  }
+
+  /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
     @Nullable abstract ValueProvider<String> getJsonTableRef();
@@ -477,25 +485,26 @@ public class BigQueryIO {
       abstract Read build();
     }
 
-    private static Builder builder() {
-      return new AutoValue_BigQueryIO_Read.Builder()
-          .setValidate(true)
-          .setBigQueryServices(new BigQueryServicesImpl());
+    /** Ensures that methods of the from() / fromQuery() family are called at most once. */
+    private void ensureFromNotCalledYet() {
+      checkState(
+          getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");
     }
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
-    public static Read from(String tableSpec) {
+    public Read from(String tableSpec) {
       return from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Read from(ValueProvider<String> tableSpec) {
-      return builder()
+    public Read from(ValueProvider<String> tableSpec) {
+      ensureFromNotCalledYet();
+      return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -505,21 +514,22 @@ public class BigQueryIO {
     /**
      * Reads results received after executing the given query.
      */
-    public static Read fromQuery(String query) {
+    public Read fromQuery(String query) {
       return fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Read fromQuery(ValueProvider<String> query) {
-      return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+    public Read fromQuery(ValueProvider<String> query) {
+      ensureFromNotCalledYet();
+      return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
-    public static Read from(TableReference table) {
+    public Read from(TableReference table) {
       return from(StaticValueProvider.of(toTableSpec(table)));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 888d9c1..f6a7fb4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -727,13 +727,13 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+    BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable");
     checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+    BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query");
     checkReadQueryObject(read, "foo_query");
   }
 
@@ -742,7 +742,7 @@ public class BigQueryIOTest implements Serializable {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
     BigQueryIO.Read read =
-        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
+        BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation();
     checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
   }
 
@@ -751,14 +751,14 @@ public class BigQueryIOTest implements Serializable {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
     BigQueryIO.Read read =
-        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
+        BigQueryIO.read().fromQuery("some_query").withoutValidation();
     checkReadQueryObjectWithValidate(read, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
     BigQueryIO.Read read =
-        BigQueryIO.Read.from("somedataset.sometable");
+        BigQueryIO.read().from("somedataset.sometable");
     checkReadTableObject(read, null, "somedataset", "sometable");
   }
 
@@ -768,7 +768,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read read = BigQueryIO.Read.from(table);
+    BigQueryIO.Read read = BigQueryIO.read().from(table);
     checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
@@ -800,7 +800,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expect(RuntimeException.class);
     // Message will be one of following depending on the execution environment.
     thrown.expectMessage(Matchers.containsString("Unsupported"));
-    p.apply(BigQueryIO.Read.from(tableRef)
+    p.apply(BigQueryIO.read().from(tableRef)
         .withTestServices(fakeBqServices));
   }
 
@@ -817,7 +817,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
             + " which only applies to queries");
     p.apply("ReadMyTable",
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .withoutResultFlattening());
     p.run();
@@ -836,7 +836,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
             + " which only applies to queries");
     p.apply(
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .withoutValidation()
             .withoutResultFlattening());
@@ -856,7 +856,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
             + " which only applies to queries");
     p.apply(
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .usingStandardSql());
     p.run();
@@ -929,7 +929,7 @@ public class BigQueryIOTest implements Serializable {
 
     Pipeline p = TestPipeline.create(bqOptions);
     PCollection<String> output = p
-        .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
+        .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
         .apply(ParDo.of(new DoFn<TableRow, String>() {
@@ -1260,7 +1260,7 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
 
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .from(tableSpec)
         .withoutResultFlattening()
         .usingStandardSql()
@@ -1276,7 +1276,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSourceDisplayDataQuery() {
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .fromQuery("myQuery")
         .withoutResultFlattening()
         .usingStandardSql()
@@ -1295,7 +1295,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1312,7 +1312,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBigQueryIOGetName() {
-    assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
+    assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
     assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
   }
 
@@ -2317,7 +2317,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read read = BigQueryIO.Read.from(
+    BigQueryIO.Read read = BigQueryIO.read().from(
         options.getInputTable()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2330,7 +2330,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read = BigQueryIO.read().fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.