You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:55 UTC

[01/10] beam git commit: Remove two unused fields

Repository: beam
Updated Branches:
  refs/heads/master cc12fd378 -> 806c53c18


Remove two unused fields


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f36344
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f36344
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f36344

Branch: refs/heads/master
Commit: 30f363444679293727171f2417c9d991a4bf7852
Parents: cc12fd3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:54:55 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:19 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java  | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/30f36344/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0e1c6fc..2902c2b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -130,7 +130,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1189,15 +1188,9 @@ public class BigQueryIO {
    * ...
    */
   private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
-    // The maximum number of retries to verify temp files.
-    private static final int MAX_FILES_VERIFY_RETRIES = 9;
-
     // The maximum number of retries to poll a BigQuery job.
     protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-    // The initial backoff for verifying temp files.
-    private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
-
     protected final ValueProvider<String> jobIdToken;
     protected final String extractDestinationDir;
     protected final BigQueryServices bqServices;


[08/10] beam git commit: Replace BigQueryIO.Read.from() with BigQueryIO.read().from()

Posted by tg...@apache.org.
Replace BigQueryIO.Read.from() with BigQueryIO.read().from()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a252a77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a252a77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a252a77

Branch: refs/heads/master
Commit: 1a252a771127febe551fda5d499c7ecb3b95cf23
Parents: 1adcbae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:15:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:36 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../beam/examples/cookbook/JoinExamples.java    |  4 +--
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../org/apache/beam/sdk/io/package-info.java    |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 36 +++++++++++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 36 ++++++++++----------
 8 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 079674a..d3c9167 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -156,7 +156,7 @@ public class BigQueryTornadoes {
     fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new CountTornadoes())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 37f9d79..fc54b13 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -200,7 +200,7 @@ public class CombinePerKeyExamples {
     fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new PlaysForWord())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index fb6b507..714a8f2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -238,7 +238,7 @@ public class FilterExamples {
 
     TableSchema schema = buildWeatherSchemaProjection();
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(ParDo.of(new ProjectionFn()))
      .apply(new BelowGlobalMean(options.getMonthFilter()))
      .apply(BigQueryIO.Write

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 7cf0942..05a3ad3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -166,8 +166,8 @@ public class JoinExamples {
     Pipeline p = Pipeline.create(options);
     // the following two 'applys' create multiple inputs to our pipeline, one for each
     // of our two input sources.
-    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
-    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+    PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
+    PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
     PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
     formattedResults.apply(TextIO.Write.to(options.getOutput()));
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index eabc42b..7e7bc72 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -149,7 +149,7 @@ public class MaxPerKeyExamples {
     fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
     TableSchema schema = new TableSchema().setFields(fields);
 
-    p.apply(BigQueryIO.Read.from(options.getInput()))
+    p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new MaxMeanTemp())
      .apply(BigQueryIO.Write
         .to(options.getOutput())

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c4ff158..c65d7dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -24,7 +24,7 @@
  * from existing storage:
  * <pre>{@code
  * PCollection<TableRow> inputData = pipeline.apply(
- *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e039c8c..dfb7ea6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
  * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
  * <pre>{@code
  * PCollection<TableRow> weatherData = pipeline.apply(
- *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
  * }</pre>
  *
  * <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -177,7 +177,7 @@ import org.slf4j.LoggerFactory;
  *
  * <pre>{@code
  * PCollection<TableRow> meanTemperatureData = pipeline.apply(
- *     BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
+ *     BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
  * }</pre>
  *
  * <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -454,6 +454,14 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
+  public static Read read() {
+    return new AutoValue_BigQueryIO_Read.Builder()
+        .setValidate(true)
+        .setBigQueryServices(new BigQueryServicesImpl())
+        .build();
+  }
+
+  /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
     @Nullable abstract ValueProvider<String> getJsonTableRef();
@@ -477,25 +485,26 @@ public class BigQueryIO {
       abstract Read build();
     }
 
-    private static Builder builder() {
-      return new AutoValue_BigQueryIO_Read.Builder()
-          .setValidate(true)
-          .setBigQueryServices(new BigQueryServicesImpl());
+    /** Ensures that methods of the from() / fromQuery() family are called at most once. */
+    private void ensureFromNotCalledYet() {
+      checkState(
+          getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");
     }
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
-    public static Read from(String tableSpec) {
+    public Read from(String tableSpec) {
       return from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Read from(ValueProvider<String> tableSpec) {
-      return builder()
+    public Read from(ValueProvider<String> tableSpec) {
+      ensureFromNotCalledYet();
+      return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -505,21 +514,22 @@ public class BigQueryIO {
     /**
      * Reads results received after executing the given query.
      */
-    public static Read fromQuery(String query) {
+    public Read fromQuery(String query) {
       return fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Read fromQuery(ValueProvider<String> query) {
-      return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+    public Read fromQuery(ValueProvider<String> query) {
+      ensureFromNotCalledYet();
+      return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
-    public static Read from(TableReference table) {
+    public Read from(TableReference table) {
       return from(StaticValueProvider.of(toTableSpec(table)));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 888d9c1..f6a7fb4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -727,13 +727,13 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+    BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable");
     checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+    BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query");
     checkReadQueryObject(read, "foo_query");
   }
 
@@ -742,7 +742,7 @@ public class BigQueryIOTest implements Serializable {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
     BigQueryIO.Read read =
-        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
+        BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation();
     checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
   }
 
@@ -751,14 +751,14 @@ public class BigQueryIOTest implements Serializable {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
     BigQueryIO.Read read =
-        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
+        BigQueryIO.read().fromQuery("some_query").withoutValidation();
     checkReadQueryObjectWithValidate(read, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
     BigQueryIO.Read read =
-        BigQueryIO.Read.from("somedataset.sometable");
+        BigQueryIO.read().from("somedataset.sometable");
     checkReadTableObject(read, null, "somedataset", "sometable");
   }
 
@@ -768,7 +768,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read read = BigQueryIO.Read.from(table);
+    BigQueryIO.Read read = BigQueryIO.read().from(table);
     checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
@@ -800,7 +800,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expect(RuntimeException.class);
     // Message will be one of following depending on the execution environment.
     thrown.expectMessage(Matchers.containsString("Unsupported"));
-    p.apply(BigQueryIO.Read.from(tableRef)
+    p.apply(BigQueryIO.read().from(tableRef)
         .withTestServices(fakeBqServices));
   }
 
@@ -817,7 +817,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
             + " which only applies to queries");
     p.apply("ReadMyTable",
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .withoutResultFlattening());
     p.run();
@@ -836,7 +836,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
             + " which only applies to queries");
     p.apply(
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .withoutValidation()
             .withoutResultFlattening());
@@ -856,7 +856,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
             + " which only applies to queries");
     p.apply(
-        BigQueryIO.Read
+        BigQueryIO.read()
             .from("foo.com:project:somedataset.sometable")
             .usingStandardSql());
     p.run();
@@ -929,7 +929,7 @@ public class BigQueryIOTest implements Serializable {
 
     Pipeline p = TestPipeline.create(bqOptions);
     PCollection<String> output = p
-        .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
+        .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
             .withTestServices(fakeBqServices)
             .withoutValidation())
         .apply(ParDo.of(new DoFn<TableRow, String>() {
@@ -1260,7 +1260,7 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
 
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .from(tableSpec)
         .withoutResultFlattening()
         .usingStandardSql()
@@ -1276,7 +1276,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSourceDisplayDataQuery() {
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .fromQuery("myQuery")
         .withoutResultFlattening()
         .usingStandardSql()
@@ -1295,7 +1295,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1312,7 +1312,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.read()
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBigQueryIOGetName() {
-    assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
+    assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
     assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
   }
 
@@ -2317,7 +2317,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read read = BigQueryIO.Read.from(
+    BigQueryIO.Read read = BigQueryIO.read().from(
         options.getInputTable()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2330,7 +2330,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read = BigQueryIO.read().fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.


[04/10] beam git commit: Makes NameUtils recognize AutoValue classes

Posted by tg...@apache.org.
Makes NameUtils recognize AutoValue classes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6b3e112
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6b3e112
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6b3e112

Branch: refs/heads/master
Commit: d6b3e112870b095d96f6f8d6bc165b88eb735952
Parents: 7d1f440
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 20:05:01 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/util/NameUtils.java   |  5 +++++
 .../java/org/apache/beam/sdk/util/NameUtilsTest.java    | 12 ++++++++++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d6b3e112/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index c67ccca..3f3054a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -151,6 +151,8 @@ public class NameUtils {
    * <ul>
    *   <li>1. It keeps the outer classes names.
    *   <li>2. It removes the common transform inner class: "Bound".
+   *   <li>3. For classes generated by AutoValue, whose names start with AutoValue_,
+   *     it delegates to the (parent) class declared in the user's source code.
    * </ul>
    *
    * <p>Examples:
@@ -161,6 +163,9 @@ public class NameUtils {
    */
   public static String approximatePTransformName(Class<?> clazz) {
     checkArgument(PTransform.class.isAssignableFrom(clazz));
+    if (clazz.getSimpleName().startsWith("AutoValue_")) {
+      return approximatePTransformName(clazz.getSuperclass());
+    }
     return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
         .replaceFirst("\\.Bound$", "");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d6b3e112/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index 6848ea4..c685a63 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
 
 import static org.junit.Assert.assertEquals;
 
+import com.google.auto.value.AutoValue;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils.NameOverride;
@@ -133,9 +134,20 @@ public class NameUtilsTest {
     assertEquals(
         "NameUtilsTest.EmbeddedPTransform",
         NameUtils.approximatePTransformName(transform.getBound().getClass()));
+    assertEquals(
+        "NameUtilsTest.SomeTransform",
+        NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
     assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
   }
 
+  @AutoValue
+  abstract static class SomeTransform extends PTransform<PBegin, PDone> {
+    @Override
+    public PDone expand(PBegin input) {
+      return null;
+    }
+  }
+
   @Test
   public void testPTransformNameWithAnonOuterClass() throws Exception {
     AnonymousClass anonymousClassObj = new AnonymousClass() {


[10/10] beam git commit: This closes #2149

Posted by tg...@apache.org.
This closes #2149


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/806c53c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/806c53c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/806c53c1

Branch: refs/heads/master
Commit: 806c53c18793b43e7ccd28f99662ee73a97237b4
Parents: cc12fd3 101715a
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 15:54:42 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:42 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |    2 +-
 .../examples/complete/StreamingWordExtract.java |    2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |    2 +-
 .../beam/examples/complete/TrafficRoutes.java   |    2 +-
 .../examples/cookbook/BigQueryTornadoes.java    |    4 +-
 .../cookbook/CombinePerKeyExamples.java         |    4 +-
 .../beam/examples/cookbook/FilterExamples.java  |    4 +-
 .../beam/examples/cookbook/JoinExamples.java    |    4 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |    4 +-
 .../beam/examples/cookbook/TriggerExample.java  |    2 +-
 .../complete/game/utils/WriteToBigQuery.java    |    2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |    2 +-
 .../org/apache/beam/sdk/io/package-info.java    |    2 +-
 .../org/apache/beam/sdk/util/NameUtils.java     |    5 +
 .../org/apache/beam/sdk/util/NameUtilsTest.java |   12 +
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 1707 ++++++++----------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  288 ++-
 17 files changed, 851 insertions(+), 1197 deletions(-)
----------------------------------------------------------------------



[02/10] beam git commit: Condense BigQueryIO.Read.Bound into BigQueryIO.Read

Posted by tg...@apache.org.
Condense BigQueryIO.Read.Bound into BigQueryIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/825338aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/825338aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/825338aa

Branch: refs/heads/master
Commit: 825338aaa5d7e5ead1afa13f63c65fb316e1aa6a
Parents: 30f3634
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:15:47 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:22 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 690 +++++++++----------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 117 ++--
 2 files changed, 359 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2902c2b..f6c8575 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -39,7 +39,6 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -454,448 +453,379 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Read {
+  public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+    @Nullable final ValueProvider<String> jsonTableRef;
+    @Nullable final ValueProvider<String> query;
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
-    public static Bound from(String tableSpec) {
-      return new Bound().from(StaticValueProvider.of(tableSpec));
+    public static Read from(String tableSpec) {
+      return new Read().from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound from(ValueProvider<String> tableSpec) {
-      return new Bound().from(tableSpec);
+    public static Read from(ValueProvider<String> tableSpec) {
+      return new Read().from(tableSpec);
     }
 
     /**
      * Reads results received after executing the given query.
      */
-    public static Bound fromQuery(String query) {
-      return new Bound().fromQuery(StaticValueProvider.of(query));
+    public static Read fromQuery(String query) {
+      return new Read().fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound fromQuery(ValueProvider<String> query) {
-      return new Bound().fromQuery(query);
+    public static Read fromQuery(ValueProvider<String> query) {
+      return new Read().fromQuery(query);
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
-    public static Bound from(TableReference table) {
-      return new Bound().from(table);
+    public static Read from(TableReference table) {
+      return new Read().from(table);
     }
 
     /**
-     * Disables BigQuery table validation, which is enabled by default.
+     * Disable validation that the table exists or the query succeeds prior to pipeline
+     * submission. Basic validation (such as ensuring that a query or table is specified) still
+     * occurs.
      */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
+    final boolean validate;
+    @Nullable final Boolean flattenResults;
+    @Nullable final Boolean useLegacySql;
+    @Nullable BigQueryServices bigQueryServices;
+
+    @VisibleForTesting @Nullable String stepUuid;
+    @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
+    private static final String QUERY_VALIDATION_FAILURE_ERROR =
+        "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+        + " pipeline, This validation can be disabled using #withoutValidation.";
+
+    private Read() {
+      this(
+          null /* name */,
+          null /* query */,
+          null /* jsonTableRef */,
+          true /* validate */,
+          null /* flattenResults */,
+          null /* useLegacySql */,
+          null /* bigQueryServices */);
+    }
+
+    private Read(
+        String name, @Nullable ValueProvider<String> query,
+        @Nullable ValueProvider<String> jsonTableRef, boolean validate,
+        @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
+        @Nullable BigQueryServices bigQueryServices) {
+      super(name);
+      this.jsonTableRef = jsonTableRef;
+      this.query = query;
+      this.validate = validate;
+      this.flattenResults = flattenResults;
+      this.useLegacySql = useLegacySql;
+      this.bigQueryServices = bigQueryServices;
     }
 
     /**
-     * A {@link PTransform} that reads from a BigQuery table and returns a bounded
-     * {@link PCollection} of {@link TableRow TableRows}.
+     * Disable validation that the table exists or the query succeeds prior to pipeline
+     * submission. Basic validation (such as ensuring that a query or table is specified) still
+     * occurs.
      */
-    public static class Bound extends PTransform<PBegin, PCollection<TableRow>> {
-      @Nullable final ValueProvider<String> jsonTableRef;
-      @Nullable final ValueProvider<String> query;
-
-      /**
-       * Disable validation that the table exists or the query succeeds prior to pipeline
-       * submission. Basic validation (such as ensuring that a query or table is specified) still
-       * occurs.
-       */
-      final boolean validate;
-      @Nullable final Boolean flattenResults;
-      @Nullable final Boolean useLegacySql;
-      @Nullable BigQueryServices bigQueryServices;
-
-      @VisibleForTesting @Nullable String stepUuid;
-      @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
-      private static final String QUERY_VALIDATION_FAILURE_ERROR =
-          "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
-          + " pipeline, This validation can be disabled using #withoutValidation.";
-
-      private Bound() {
-        this(
-            null /* name */,
-            null /* query */,
-            null /* jsonTableRef */,
-            true /* validate */,
-            null /* flattenResults */,
-            null /* useLegacySql */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(
-          String name, @Nullable ValueProvider<String> query,
-          @Nullable ValueProvider<String> jsonTableRef, boolean validate,
-          @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
-          @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.query = query;
-        this.validate = validate;
-        this.flattenResults = flattenResults;
-        this.useLegacySql = useLegacySql;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(ValueProvider<String> tableSpec) {
-        return new Bound(
-            name, query,
-            NestedValueProvider.of(
-                NestedValueProvider.of(
-                    tableSpec, new TableSpecToTableRef()),
-                new TableRefToJson()),
-            validate, flattenResults, useLegacySql, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(TableReference table) {
-        return from(StaticValueProvider.of(toTableSpec(table)));
-      }
-
-      /**
-       * Returns a copy of this transform that reads the results of the specified query.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>By default, the query results will be flattened -- see
-       * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-       * Jobs documentation</a> for more information.  To disable flattening, use
-       * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
-       *
-       * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery
-       * Standard SQL dialect, use {@link BigQueryIO.Read.Bound#usingStandardSql}.
-       */
-      public Bound fromQuery(String query) {
-        return fromQuery(StaticValueProvider.of(query));
-      }
-
-      /**
-       * Like {@link #fromQuery(String)}, but from a {@link ValueProvider}.
-       */
-      public Bound fromQuery(ValueProvider<String> query) {
-        return new Bound(name, query, jsonTableRef, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
-            MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE),
-            bigQueryServices);
-      }
+    public Read withoutValidation() {
+      return new Read(
+          name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
+          bigQueryServices);
+    }
 
-      /**
-       * Disable validation that the table exists or the query succeeds prior to pipeline
-       * submission. Basic validation (such as ensuring that a query or table is specified) still
-       * occurs.
-       */
-      public Bound withoutValidation() {
-        return new Bound(
-            name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
-            bigQueryServices);
-      }
+    /**
+     * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
+     * flattening of query results</a>.
+     *
+     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
+     * from a table will cause an error during validation.
+     */
+    public Read withoutResultFlattening() {
+      return new Read(
+          name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
+          bigQueryServices);
+    }
 
-      /**
-       * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-       * flattening of query results</a>.
-       *
-       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
-       * from a table will cause an error during validation.
-       */
-      public Bound withoutResultFlattening() {
-        return new Bound(
-            name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
-            bigQueryServices);
-      }
+    /**
+     * Enables BigQuery's Standard SQL dialect when reading from a query.
+     *
+     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
+     * from a table will cause an error during validation.
+     */
+    public Read usingStandardSql() {
+      return new Read(
+          name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
+          bigQueryServices);
+    }
 
-      /**
-       * Enables BigQuery's Standard SQL dialect when reading from a query.
-       *
-       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
-       * from a table will cause an error during validation.
-       */
-      public Bound usingStandardSql() {
-        return new Bound(
-            name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
-            bigQueryServices);
-      }
+    @VisibleForTesting
+    Read withTestServices(BigQueryServices testServices) {
+      return new Read(
+          name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+    }
 
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(
-            name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+    @Override
+    public void validate(PBegin input) {
+      // Even if existence validation is disabled, we need to make sure that the BigQueryIO
+      // read is properly specified.
+      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+      String tempLocation = bqOptions.getTempLocation();
+      checkArgument(
+          !Strings.isNullOrEmpty(tempLocation),
+          "BigQueryIO.Read needs a GCS temp location to store temp files.");
+      if (bigQueryServices == null) {
+        try {
+          GcsPath.fromUri(tempLocation);
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                  tempLocation),
+              e);
+        }
       }
 
-      @Override
-      public void validate(PBegin input) {
-        // Even if existence validation is disabled, we need to make sure that the BigQueryIO
-        // read is properly specified.
-        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+      ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
 
-        String tempLocation = bqOptions.getTempLocation();
-        checkArgument(
-            !Strings.isNullOrEmpty(tempLocation),
-            "BigQueryIO.Read needs a GCS temp location to store temp files.");
-        if (bigQueryServices == null) {
-          try {
-            GcsPath.fromUri(tempLocation);
-          } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException(
-                String.format(
-                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                    tempLocation),
-                e);
-          }
-        }
-
-        ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
+      checkState(
+          table == null || query == null,
+          "Invalid BigQueryIO.Read: table reference and query may not both be set");
+      checkState(
+          table != null || query != null,
+          "Invalid BigQueryIO.Read: one of table reference and query must be set");
 
+      if (table != null) {
         checkState(
-            table == null || query == null,
-            "Invalid BigQueryIO.Read: table reference and query may not both be set");
+            flattenResults == null,
+            "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+                + " preference, which only applies to queries");
         checkState(
-            table != null || query != null,
-            "Invalid BigQueryIO.Read: one of table reference and query must be set");
-
-        if (table != null) {
-          checkState(
-              flattenResults == null,
-              "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
-                  + " preference, which only applies to queries");
-          checkState(
-              useLegacySql == null,
-              "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
-                  + " preference, which only applies to queries");
-        } else /* query != null */ {
-          checkState(flattenResults != null, "flattenResults should not be null if query is set");
-          checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
-        }
-
-        // Note that a table or query check can fail if the table or dataset are created by
-        // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
-        // For these cases the withoutValidation method can be used to disable the check.
-        if (validate && table != null) {
-          checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
-          // Check for source table presence for early failure notification.
-          DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
-          verifyDatasetPresence(datasetService, table.get());
-          verifyTablePresence(datasetService, table.get());
-        } else if (validate && query != null) {
-          checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
-          JobService jobService = getBigQueryServices().getJobService(bqOptions);
-          try {
-            jobService.dryRunQuery(
-                bqOptions.getProject(),
-                new JobConfigurationQuery()
-                    .setQuery(query.get())
-                    .setFlattenResults(flattenResults)
-                    .setUseLegacySql(useLegacySql));
-          } catch (Exception e) {
-            throw new IllegalArgumentException(
-                String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
-          }
-        }
-      }
-
-      @Override
-      public PCollection<TableRow> expand(PBegin input) {
-        stepUuid = randomUUIDString();
-        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-        jobUuid = NestedValueProvider.of(
-           StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
-        final ValueProvider<String> jobIdToken = NestedValueProvider.of(
-            jobUuid, new BeamJobUuidToBigQueryJobUuid());
-
-        BoundedSource<TableRow> source;
-        final BigQueryServices bqServices = getBigQueryServices();
-
-        final String extractDestinationDir;
-        String tempLocation = bqOptions.getTempLocation();
+            useLegacySql == null,
+            "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+                + " preference, which only applies to queries");
+      } else /* query != null */ {
+        checkState(flattenResults != null, "flattenResults should not be null if query is set");
+        checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+      }
+
+      // Note that a table or query check can fail if the table or dataset are created by
+      // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
+      // For these cases the withoutValidation method can be used to disable the check.
+      if (validate && table != null) {
+        checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
+        // Check for source table presence for early failure notification.
+        DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
+        verifyDatasetPresence(datasetService, table.get());
+        verifyTablePresence(datasetService, table.get());
+      } else if (validate && query != null) {
+        checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+        JobService jobService = getBigQueryServices().getJobService(bqOptions);
         try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          extractDestinationDir = factory.resolve(tempLocation, stepUuid);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve extract destination directory in %s", tempLocation));
-        }
-
-        final String executingProject = bqOptions.getProject();
-        if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
-          source = BigQueryQuerySource.create(
-              jobIdToken, query, NestedValueProvider.of(
-                jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
-              flattenResults, useLegacySql, extractDestinationDir, bqServices);
-        } else {
-          ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
-          source = BigQueryTableSource.create(
-              jobIdToken, inputTable, extractDestinationDir, bqServices,
-              StaticValueProvider.of(executingProject));
+          jobService.dryRunQuery(
+              bqOptions.getProject(),
+              new JobConfigurationQuery()
+                  .setQuery(query.get())
+                  .setFlattenResults(flattenResults)
+                  .setUseLegacySql(useLegacySql));
+        } catch (Exception e) {
+          throw new IllegalArgumentException(
+              String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
         }
-        PassThroughThenCleanup.CleanupOperation cleanupOperation =
-            new PassThroughThenCleanup.CleanupOperation() {
-              @Override
-              void cleanup(PipelineOptions options) throws Exception {
-                BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-
-                JobReference jobRef = new JobReference()
-                    .setProjectId(executingProject)
-                    .setJobId(getExtractJobId(jobIdToken));
-
-                Job extractJob = bqServices.getJobService(bqOptions)
-                    .getJob(jobRef);
-
-                Collection<String> extractFiles = null;
-                if (extractJob != null) {
-                  extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
-                } else {
-                  IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
-                  Collection<String> dirMatch = factory.match(extractDestinationDir);
-                  if (!dirMatch.isEmpty()) {
-                    extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
-                  }
-                }
-                if (extractFiles != null && !extractFiles.isEmpty()) {
-                  new GcsUtilFactory().create(options).remove(extractFiles);
-                }
-              }};
-        return input.getPipeline()
-            .apply(org.apache.beam.sdk.io.Read.from(source))
-            .setCoder(getDefaultOutputCoder())
-            .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
       }
+    }
 
-      @Override
-      protected Coder<TableRow> getDefaultOutputCoder() {
-        return TableRowJsonCoder.of();
+    @Override
+    public PCollection<TableRow> expand(PBegin input) {
+      stepUuid = randomUUIDString();
+      BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+      jobUuid = NestedValueProvider.of(
+         StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+      final ValueProvider<String> jobIdToken = NestedValueProvider.of(
+          jobUuid, new BeamJobUuidToBigQueryJobUuid());
+
+      BoundedSource<TableRow> source;
+      final BigQueryServices bqServices = getBigQueryServices();
+
+      final String extractDestinationDir;
+      String tempLocation = bqOptions.getTempLocation();
+      try {
+        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+        extractDestinationDir = factory.resolve(tempLocation, stepUuid);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Failed to resolve extract destination directory in %s", tempLocation));
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
-              .withLabel("Table"))
-            .addIfNotNull(DisplayData.item("query", query)
-              .withLabel("Query"))
-            .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
-              .withLabel("Flatten Query Results"))
-            .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
-              .withLabel("Use Legacy SQL Dialect"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"),
-                true);
+      final String executingProject = bqOptions.getProject();
+      if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
+        source = BigQueryQuerySource.create(
+            jobIdToken, query, NestedValueProvider.of(
+              jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+            flattenResults, useLegacySql, extractDestinationDir, bqServices);
+      } else {
+        ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
+        source = BigQueryTableSource.create(
+            jobIdToken, inputTable, extractDestinationDir, bqServices,
+            StaticValueProvider.of(executingProject));
       }
+      PassThroughThenCleanup.CleanupOperation cleanupOperation =
+          new PassThroughThenCleanup.CleanupOperation() {
+            @Override
+            void cleanup(PipelineOptions options) throws Exception {
+              BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+              JobReference jobRef = new JobReference()
+                  .setProjectId(executingProject)
+                  .setJobId(getExtractJobId(jobIdToken));
+
+              Job extractJob = bqServices.getJobService(bqOptions)
+                  .getJob(jobRef);
+
+              Collection<String> extractFiles = null;
+              if (extractJob != null) {
+                extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+              } else {
+                IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+                Collection<String> dirMatch = factory.match(extractDestinationDir);
+                if (!dirMatch.isEmpty()) {
+                  extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+                }
+              }
+              if (extractFiles != null && !extractFiles.isEmpty()) {
+                new GcsUtilFactory().create(options).remove(extractFiles);
+              }
+            }};
+      return input.getPipeline()
+          .apply(org.apache.beam.sdk.io.Read.from(source))
+          .setCoder(getDefaultOutputCoder())
+          .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
+    }
 
-      /**
-       * Returns the table to read, or {@code null} if reading from a query instead.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
-          BigQueryOptions bqOptions) {
-        ValueProvider<TableReference> table = getTableProvider();
-        if (table == null) {
-          return table;
-        }
-        if (!table.isAccessible()) {
-          LOG.info("Using a dynamic value for table input. This must contain a project"
-              + " in the table reference: {}", table);
-          return table;
-        }
-        if (Strings.isNullOrEmpty(table.get().getProjectId())) {
-          // If user does not specify a project we assume the table to be located in
-          // the default project.
-          TableReference tableRef = table.get();
-          tableRef.setProjectId(bqOptions.getProject());
-          return NestedValueProvider.of(StaticValueProvider.of(
-              toJsonString(tableRef)), new JsonTableRefToTableRef());
-        }
+    @Override
+    protected Coder<TableRow> getDefaultOutputCoder() {
+      return TableRowJsonCoder.of();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+            .withLabel("Table"))
+          .addIfNotNull(DisplayData.item("query", query)
+            .withLabel("Query"))
+          .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+            .withLabel("Flatten Query Results"))
+          .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+            .withLabel("Use Legacy SQL Dialect"))
+          .addIfNotDefault(DisplayData.item("validation", validate)
+            .withLabel("Validation Enabled"),
+              true);
+    }
+
+    /**
+     * Returns the table to read, or {@code null} if reading from a query instead.
+     *
+     * <p>If the table's project is not specified, use the executing project.
+     */
+    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+        BigQueryOptions bqOptions) {
+      ValueProvider<TableReference> table = getTableProvider();
+      if (table == null) {
         return table;
       }
-
-      /**
-       * Returns the table to read, or {@code null} if reading from a query instead.
-       */
-      @Nullable
-      public ValueProvider<TableReference> getTableProvider() {
-        return jsonTableRef == null
-            ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+      if (!table.isAccessible()) {
+        LOG.info("Using a dynamic value for table input. This must contain a project"
+            + " in the table reference: {}", table);
+        return table;
       }
-      /**
-       * Returns the table to read, or {@code null} if reading from a query instead.
-       */
-      @Nullable
-      public TableReference getTable() {
-        ValueProvider<TableReference> provider = getTableProvider();
-        return provider == null ? null : provider.get();
+      if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+        // If user does not specify a project we assume the table to be located in
+        // the default project.
+        TableReference tableRef = table.get();
+        tableRef.setProjectId(bqOptions.getProject());
+        return NestedValueProvider.of(StaticValueProvider.of(
+            toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
+      return table;
+    }
 
-      /**
-       * Returns the query to be read, or {@code null} if reading from a table instead.
-       */
-      @Nullable
-      public String getQuery() {
-        return query == null ? null : query.get();
-      }
+    /**
+     * Returns the table to read, or {@code null} if reading from a query instead.
+     */
+    @Nullable
+    public ValueProvider<TableReference> getTableProvider() {
+      return jsonTableRef == null
+          ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+    }
+    /**
+     * Returns the table to read, or {@code null} if reading from a query instead.
+     */
+    @Nullable
+    public TableReference getTable() {
+      ValueProvider<TableReference> provider = getTableProvider();
+      return provider == null ? null : provider.get();
+    }
 
-      /**
-       * Returns the query to be read, or {@code null} if reading from a table instead.
-       */
-      @Nullable
-      public ValueProvider<String> getQueryProvider() {
-        return query;
-      }
+    /**
+     * Returns the query to be read, or {@code null} if reading from a table instead.
+     */
+    @Nullable
+    public String getQuery() {
+      return query == null ? null : query.get();
+    }
 
-      /**
-       * Returns true if table validation is enabled.
-       */
-      public boolean getValidate() {
-        return validate;
-      }
+    /**
+     * Returns the query to be read, or {@code null} if reading from a table instead.
+     */
+    @Nullable
+    public ValueProvider<String> getQueryProvider() {
+      return query;
+    }
 
-      /**
-       * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
-       */
-      public Boolean getFlattenResults() {
-        return flattenResults;
-      }
+    /**
+     * Returns true if table validation is enabled.
+     */
+    public boolean getValidate() {
+      return validate;
+    }
 
-      /**
-       * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
-       * if not applicable.
-       */
-      @Nullable
-      public Boolean getUseLegacySql() {
-        return useLegacySql;
-      }
+    /**
+     * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
+     */
+    public Boolean getFlattenResults() {
+      return flattenResults;
+    }
 
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
-      }
+    /**
+     * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
+     * if not applicable.
+     */
+    @Nullable
+    public Boolean getUseLegacySql() {
+      return useLegacySql;
     }
 
-    /** Disallow construction of utility class. */
-    private Read() {}
+    private BigQueryServices getBigQueryServices() {
+      if (bigQueryServices == null) {
+        bigQueryServices = new BigQueryServicesImpl();
+      }
+      return bigQueryServices;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index c9061a3..bb1528b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -665,28 +665,28 @@ public class BigQueryIOTest implements Serializable {
   @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
 
   private void checkReadTableObject(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String table) {
-    checkReadTableObjectWithValidate(bound, project, dataset, table, true);
+      BigQueryIO.Read read, String project, String dataset, String table) {
+    checkReadTableObjectWithValidate(read, project, dataset, table, true);
   }
 
-  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
-    checkReadQueryObjectWithValidate(bound, query, true);
+  private void checkReadQueryObject(BigQueryIO.Read read, String query) {
+    checkReadQueryObjectWithValidate(read, query, true);
   }
 
   private void checkReadTableObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) {
-    assertEquals(project, bound.getTable().getProjectId());
-    assertEquals(dataset, bound.getTable().getDatasetId());
-    assertEquals(table, bound.getTable().getTableId());
-    assertNull(bound.query);
-    assertEquals(validate, bound.getValidate());
+      BigQueryIO.Read read, String project, String dataset, String table, boolean validate) {
+    assertEquals(project, read.getTable().getProjectId());
+    assertEquals(dataset, read.getTable().getDatasetId());
+    assertEquals(table, read.getTable().getTableId());
+    assertNull(read.query);
+    assertEquals(validate, read.getValidate());
   }
 
   private void checkReadQueryObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String query, boolean validate) {
-    assertNull(bound.getTable());
-    assertEquals(query, bound.getQuery());
-    assertEquals(validate, bound.getValidate());
+      BigQueryIO.Read read, String query, boolean validate) {
+    assertNull(read.getTable());
+    assertEquals(query, read.getQuery());
+    assertEquals(validate, read.getValidate());
   }
 
   private void checkWriteObject(
@@ -728,39 +728,39 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+    BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+    checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
-    checkReadQueryObject(bound, "foo_query");
+    BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+    checkReadQueryObject(read, "foo_query");
   }
 
   @Test
   public void testBuildTableBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
-    checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
+    checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
   }
 
   @Test
   public void testBuildQueryBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         BigQueryIO.Read.fromQuery("some_query").withoutValidation();
-    checkReadQueryObjectWithValidate(bound, "some_query", false);
+    checkReadQueryObjectWithValidate(read, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         BigQueryIO.Read.from("somedataset.sometable");
-    checkReadTableObject(bound, null, "somedataset", "sometable");
+    checkReadTableObject(read, null, "somedataset", "sometable");
   }
 
   @Test
@@ -769,8 +769,8 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+    BigQueryIO.Read read = BigQueryIO.Read.from(table);
+    checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
@@ -807,39 +807,6 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testBuildSourceWithoutTableQueryOrValidation() {
-    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQueryIO.Read: one of table reference and query must be set");
-    p.apply(BigQueryIO.Read.withoutValidation());
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testBuildSourceWithTableAndQuery() {
-    BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQueryIO.Read: table reference and query may not both be set");
-    p.apply("ReadMyTable",
-        BigQueryIO.Read
-            .from("foo.com:project:somedataset.sometable")
-            .fromQuery("query"));
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
   public void testBuildSourceWithTableAndFlatten() {
     BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
@@ -1291,12 +1258,11 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testBuildSourceDisplayData() {
+  public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
 
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .from(tableSpec)
-        .fromQuery("myQuery")
         .withoutResultFlattening()
         .usingStandardSql()
         .withoutValidation();
@@ -1304,6 +1270,21 @@ public class BigQueryIOTest implements Serializable {
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("flattenResults", false));
+    assertThat(displayData, hasDisplayItem("useLegacySql", false));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
+  public void testBuildSourceDisplayDataQuery() {
+    BigQueryIO.Read read = BigQueryIO.Read
+        .fromQuery("myQuery")
+        .withoutResultFlattening()
+        .usingStandardSql()
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+
     assertThat(displayData, hasDisplayItem("query", "myQuery"));
     assertThat(displayData, hasDisplayItem("flattenResults", false));
     assertThat(displayData, hasDisplayItem("useLegacySql", false));
@@ -1315,7 +1296,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1332,7 +1313,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -2375,7 +2356,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+    BigQueryIO.Read read = BigQueryIO.Read.from(
         options.getInputTable()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2388,7 +2369,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2497,10 +2478,10 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     Pipeline pipeline = TestPipeline.create(options);
     bqOptions.setTempLocation("gs://testbucket/testdir");
-    BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read1);
-    BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read2);
     assertNotEquals(read1.stepUuid, read2.stepUuid);


[09/10] beam git commit: Replace BigQueryIO.Write.to() with BigQueryIO.write().to()

Posted by tg...@apache.org.
Replace BigQueryIO.Write.to() with BigQueryIO.write().to()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/101715a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/101715a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/101715a7

Branch: refs/heads/master
Commit: 101715a788509aa9bdfdefd54eaceca35feca485
Parents: 1a252a7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:21:39 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:39 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |  2 +-
 .../examples/complete/StreamingWordExtract.java |  2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |  2 +-
 .../beam/examples/complete/TrafficRoutes.java   |  2 +-
 .../examples/cookbook/BigQueryTornadoes.java    |  2 +-
 .../cookbook/CombinePerKeyExamples.java         |  2 +-
 .../beam/examples/cookbook/FilterExamples.java  |  2 +-
 .../examples/cookbook/MaxPerKeyExamples.java    |  2 +-
 .../beam/examples/cookbook/TriggerExample.java  |  2 +-
 .../complete/game/utils/WriteToBigQuery.java    |  2 +-
 .../game/utils/WriteWindowedToBigQuery.java     |  2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 46 ++++++++++++--------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 46 ++++++++++----------
 13 files changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 861a292..fba3dc0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -491,7 +491,7 @@ public class AutoComplete {
 
       toWrite
         .apply(ParDo.of(new FormatForBigquery()))
-        .apply(BigQueryIO.Write
+        .apply(BigQueryIO.write()
                .to(tableRef)
                .withSchema(FormatForBigquery.getSchema())
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index e8d8950..2e7d451 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -136,7 +136,7 @@ public class StreamingWordExtract {
         .apply(ParDo.of(new ExtractWords()))
         .apply(ParDo.of(new Uppercase()))
         .apply(ParDo.of(new StringToRowConverter()))
-        .apply(BigQueryIO.Write.to(tableSpec)
+        .apply(BigQueryIO.write().to(tableSpec)
             .withSchema(StringToRowConverter.getSchema()));
 
     PipelineResult result = pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 412f7fb..c9508eb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -348,7 +348,7 @@ public class TrafficMaxLaneFlow {
             Duration.standardMinutes(options.getWindowDuration())).
             every(Duration.standardMinutes(options.getWindowSlideEvery()))))
         .apply(new MaxLaneFlow())
-        .apply(BigQueryIO.Write.to(tableRef)
+        .apply(BigQueryIO.write().to(tableRef)
             .withSchema(FormatMaxesFn.getSchema()));
 
     // Run the pipeline.

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 50d3ae4..fc5eb89 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -359,7 +359,7 @@ public class TrafficRoutes {
             Duration.standardMinutes(options.getWindowDuration())).
             every(Duration.standardMinutes(options.getWindowSlideEvery()))))
         .apply(new TrackSpeed())
-        .apply(BigQueryIO.Write.to(tableRef)
+        .apply(BigQueryIO.write().to(tableRef)
             .withSchema(FormatStatsFn.getSchema()));
 
     // Run the pipeline.

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index d3c9167..f8bc104 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -158,7 +158,7 @@ public class BigQueryTornadoes {
 
     p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new CountTornadoes())
-     .apply(BigQueryIO.Write
+     .apply(BigQueryIO.write()
         .to(options.getOutput())
         .withSchema(schema)
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index fc54b13..a7016b0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -202,7 +202,7 @@ public class CombinePerKeyExamples {
 
     p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new PlaysForWord())
-     .apply(BigQueryIO.Write
+     .apply(BigQueryIO.write()
         .to(options.getOutput())
         .withSchema(schema)
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 714a8f2..7adf7c6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -241,7 +241,7 @@ public class FilterExamples {
     p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(ParDo.of(new ProjectionFn()))
      .apply(new BelowGlobalMean(options.getMonthFilter()))
-     .apply(BigQueryIO.Write
+     .apply(BigQueryIO.write()
         .to(options.getOutput())
         .withSchema(schema)
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 7e7bc72..a8dc7f9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -151,7 +151,7 @@ public class MaxPerKeyExamples {
 
     p.apply(BigQueryIO.read().from(options.getInput()))
      .apply(new MaxMeanTemp())
-     .apply(BigQueryIO.Write
+     .apply(BigQueryIO.write()
         .to(options.getOutput())
         .withSchema(schema)
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index af4a692..7048bde 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -452,7 +452,7 @@ public class TriggerExample {
         .apply(new CalculateTotalFlow(options.getWindowDuration()));
 
     for (int i = 0; i < resultList.size(); i++){
-      resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema()));
+      resultList.get(i).apply(BigQueryIO.write().to(tableRef).withSchema(getSchema()));
     }
 
     PipelineResult result = pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 1f33915..c124624 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -121,7 +121,7 @@ public class WriteToBigQuery<InputT>
   public PDone expand(PCollection<InputT> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
-      .apply(BigQueryIO.Write
+      .apply(BigQueryIO.write()
                 .to(getTable(teamAndScore.getPipeline(),
                     tableName))
                 .withSchema(getSchema())

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 7a4fb2c..7d16fa9 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T>
   public PDone expand(PCollection<T> teamAndScore) {
     return teamAndScore
       .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
-      .apply(BigQueryIO.Write
+      .apply(BigQueryIO.write()
                 .to(getTable(teamAndScore.getPipeline(),
                     tableName))
                 .withSchema(getSchema())

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index dfb7ea6..3e699d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1351,12 +1351,12 @@ public class BigQueryIO {
    * exist.
    *
    * <p>By default, tables will be created if they do not exist, which corresponds to a
-   * {@link CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's
-   * Jobs API. A schema must be provided (via {@link BigQueryIO.Write#withSchema(TableSchema)}),
+   * {@link Write.CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of
+   * BigQuery's Jobs API. A schema must be provided (via {@link Write#withSchema(TableSchema)}),
    * or else the transform may fail at runtime with an {@link IllegalArgumentException}.
    *
    * <p>By default, writes require an empty table, which corresponds to
-   * a {@link WriteDisposition#WRITE_EMPTY} disposition that matches the
+   * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the
    * default of BigQuery's Jobs API.
    *
    * <p>Here is a sample transform that produces TableRow values containing
@@ -1371,6 +1371,16 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
+  public static Write write() {
+    return new AutoValue_BigQueryIO_Write.Builder()
+        .setValidate(true)
+        .setBigQueryServices(new BigQueryServicesImpl())
+        .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
+        .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
+        .build();
+  }
+
+  /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
     @VisibleForTesting
@@ -1416,14 +1426,6 @@ public class BigQueryIO {
       abstract Write build();
     }
 
-    private static Builder builder() {
-      return new AutoValue_BigQueryIO_Write.Builder()
-          .setValidate(true)
-          .setBigQueryServices(new BigQueryServicesImpl())
-          .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-          .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
-    }
-
     /**
      * An enumeration type for the BigQuery create disposition strings.
      *
@@ -1491,23 +1493,30 @@ public class BigQueryIO {
       WRITE_EMPTY
     }
 
+    /** Ensures that methods of the to() family are called at most once. */
+    private void ensureToNotCalledYet() {
+      checkState(
+          getJsonTableRef() == null && getTable() == null, "to() already called");
+    }
+
     /**
      * Creates a write transformation for the given table specification.
      *
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
      */
-    public static Write to(String tableSpec) {
+    public Write to(String tableSpec) {
       return to(StaticValueProvider.of(tableSpec));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(TableReference table) {
+    public Write to(TableReference table) {
       return to(StaticValueProvider.of(toTableSpec(table)));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(ValueProvider<String> tableSpec) {
-      return builder()
+    public Write to(ValueProvider<String> tableSpec) {
+      ensureToNotCalledYet();
+      return toBuilder()
           .setJsonTableRef(
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -1526,7 +1535,7 @@ public class BigQueryIO {
      * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
      * always return the same table specification.
      */
-    public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+    public Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
       return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
     }
 
@@ -1537,9 +1546,10 @@ public class BigQueryIO {
      * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
      * always return the same table reference.
      */
-    private static Write toTableReference(
+    private Write toTableReference(
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return builder().setTableRefFunction(tableRefFunction).build();
+      ensureToNotCalledYet();
+      return toBuilder().setTableRefFunction(tableRefFunction).build();
     }
 
     private static class TranslateTableSpecFunction implements

http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f6a7fb4..8a53d02 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -963,7 +963,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "b").set("number", 2),
         new TableRow().set("name", "c").set("number", 3))
         .withCoder(TableRowJsonCoder.of()))
-    .apply(BigQueryIO.Write.to("dataset-id.table-id")
+    .apply(BigQueryIO.write().to("dataset-id.table-id")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
         .withSchema(new TableSchema().setFields(
             ImmutableList.of(
@@ -997,7 +997,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "d").set("number", 4))
           .withCoder(TableRowJsonCoder.of()))
             .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
-            .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+            .apply(BigQueryIO.write().to("project-id:dataset-id.table-id")
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                 .withSchema(new TableSchema().setFields(
                     ImmutableList.of(
@@ -1153,7 +1153,7 @@ public class BigQueryIOTest implements Serializable {
     p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of()))
         .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
         .apply(Window.<TableRow>into(window))
-        .apply(BigQueryIO.Write
+        .apply(BigQueryIO.write()
             .to(tableFunction)
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
             .withSchema(new TableSchema().setFields(
@@ -1205,7 +1205,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "b").set("number", 2),
         new TableRow().set("name", "c").set("number", 3))
         .withCoder(TableRowJsonCoder.of()))
-    .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+    .apply(BigQueryIO.write().to("project-id:dataset-id.table-id")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER)
         .withTestServices(fakeBqServices)
         .withoutValidation());
@@ -1238,7 +1238,7 @@ public class BigQueryIOTest implements Serializable {
         new TableRow().set("name", "b").set("number", 2),
         new TableRow().set("name", "c").set("number", 3))
         .withCoder(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write.to("dataset-id.table-id")
+        .apply(BigQueryIO.write().to("dataset-id.table-id")
             .withCreateDisposition(CreateDisposition.CREATE_NEVER)
             .withTestServices(fakeBqServices)
             .withoutValidation());
@@ -1328,7 +1328,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBuildWrite() {
     BigQueryIO.Write write =
-            BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
+            BigQueryIO.write().to("foo.com:project:somedataset.sometable");
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1354,7 +1354,7 @@ public class BigQueryIOTest implements Serializable {
     options.as(StreamingOptions.class).setStreaming(streaming);
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("project:dataset.table")
         .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
         .withTestServices(new FakeBigQueryServices()
@@ -1375,7 +1375,7 @@ public class BigQueryIOTest implements Serializable {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
     BigQueryIO.Write write =
-        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
+        BigQueryIO.write().to("foo.com:project:somedataset.sometable").withoutValidation();
     checkWriteObjectWithValidate(
         write,
         "foo.com:project",
@@ -1390,7 +1390,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteDefaultProject() {
-    BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
+    BigQueryIO.Write write = BigQueryIO.write().to("somedataset.sometable");
     checkWriteObject(
         write, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1402,7 +1402,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Write write = BigQueryIO.Write.to(table);
+    BigQueryIO.Write write = BigQueryIO.write().to(table);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1412,7 +1412,7 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildWriteWithSchema() {
     TableSchema schema = new TableSchema();
     BigQueryIO.Write write =
-        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
+        BigQueryIO.write().to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
         write, "foo.com:project", "somedataset", "sometable",
         schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteWithCreateDispositionNever() {
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
@@ -1430,7 +1430,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
@@ -1440,7 +1440,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteWithWriteDispositionTruncate() {
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
@@ -1450,7 +1450,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteWithWriteDispositionAppend() {
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
@@ -1460,7 +1460,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteWithWriteDispositionEmpty() {
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
@@ -1471,7 +1471,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBuildWriteWithWriteWithTableDescription() {
     final String tblDescription = "foo bar table";
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to("foo.com:project:somedataset.sometable")
         .withTableDescription(tblDescription);
     checkWriteObject(
@@ -1491,7 +1491,7 @@ public class BigQueryIOTest implements Serializable {
     TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
     final String tblDescription = "foo bar table";
 
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to(tableSpec)
         .withSchema(schema)
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1556,7 +1556,7 @@ public class BigQueryIOTest implements Serializable {
             .or(Matchers.containsString("BigQuery dataset not found for table")));
     tableRows
         .apply(
-            BigQueryIO.Write.to(tableRef)
+            BigQueryIO.write().to(tableRef)
                 .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                 .withSchema(new TableSchema())
                 .withTestServices(fakeBqServices));
@@ -1603,7 +1603,7 @@ public class BigQueryIOTest implements Serializable {
                 }))
         .setCoder(TableRowJsonCoder.of());
     tableRows
-        .apply(BigQueryIO.Write.to(tableRef)
+        .apply(BigQueryIO.write().to(tableRef)
             .withCreateDisposition(CreateDisposition.CREATE_NEVER)
             .withoutValidation());
   }
@@ -1675,7 +1675,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testBigQueryIOGetName() {
     assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
-    assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
+    assertEquals("BigQueryIO.Write", BigQueryIO.write().to("somedataset.sometable").getName());
   }
 
   @Test
@@ -1686,7 +1686,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage("no schema was provided");
     p
         .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write
+        .apply(BigQueryIO.write()
             .to("dataset.table")
             .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
   }
@@ -2343,7 +2343,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Write write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.write()
         .to(options.getOutputTable())
         .withSchema(NestedValueProvider.of(
             options.getOutputSchema(), new JsonSchemaToTableSchema()))


[03/10] beam git commit: Condense BigQueryIO.Write.Bound into BigQueryIO.Write

Posted by tg...@apache.org.
Condense BigQueryIO.Write.Bound into BigQueryIO.Write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d1f4400
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d1f4400
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d1f4400

Branch: refs/heads/master
Commit: 7d1f4400ab844c7b4e636482891be55174390431
Parents: 825338a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:26:09 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:24 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 1080 +++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  116 +-
 2 files changed, 552 insertions(+), 644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f6c8575..90d7f67 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1415,7 +1415,43 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Write {
+  public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+    // Maximum number of files in a single partition.
+    static final int MAX_NUM_FILES = 10000;
+
+    // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
+    static final long MAX_SIZE_BYTES = 11 * (1L << 40);
+
+    // The maximum number of retry jobs.
+    private static final int MAX_RETRY_JOBS = 3;
+
+    // The maximum number of retries to poll the status of a job.
+    // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+    private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+    @Nullable private final ValueProvider<String> jsonTableRef;
+
+    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+
+    // Table schema. The schema is required only if the table does not exist.
+    @Nullable private final ValueProvider<String> jsonSchema;
+
+    // Options for creating the table. Valid values are CREATE_IF_NEEDED and
+    // CREATE_NEVER.
+    final CreateDisposition createDisposition;
+
+    // Options for writing to the table. Valid values are WRITE_TRUNCATE,
+    // WRITE_APPEND and WRITE_EMPTY.
+    final WriteDisposition writeDisposition;
+
+    @Nullable
+    final String tableDescription;
+
+    // An option to indicate if table validation is desired. Default is true.
+    final boolean validate;
+
+    @Nullable private BigQueryServices bigQueryServices;
+
     /**
      * An enumeration type for the BigQuery create disposition strings.
      *
@@ -1488,18 +1524,18 @@ public class BigQueryIO {
      *
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
      */
-    public static Bound to(String tableSpec) {
-      return new Bound().to(tableSpec);
+    public static Write to(String tableSpec) {
+      return new Write().withTableSpec(tableSpec);
     }
 
     /** Creates a write transformation for the given table. */
-    public static Bound to(ValueProvider<String> tableSpec) {
-      return new Bound().to(tableSpec);
+    public static Write to(ValueProvider<String> tableSpec) {
+      return new Write().withTableSpec(tableSpec);
     }
 
     /** Creates a write transformation for the given table. */
-    public static Bound to(TableReference table) {
-      return new Bound().to(table);
+    public static Write to(TableReference table) {
+      return new Write().withTableRef(table);
     }
 
     /**
@@ -1513,8 +1549,8 @@ public class BigQueryIO {
      * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
      * always return the same table specification.
      */
-    public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return new Bound().to(tableSpecFunction);
+    public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+      return new Write().withTableSpec(tableSpecFunction);
     }
 
     /**
@@ -1524,634 +1560,547 @@ public class BigQueryIO {
      * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
      * always return the same table reference.
      */
-    public static Bound toTableReference(
+    private static Write toTableReference(
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Bound().toTableReference(tableRefFunction);
+      return new Write().withTableRef(tableRefFunction);
+    }
+
+    private static class TranslateTableSpecFunction implements
+        SerializableFunction<BoundedWindow, TableReference> {
+      private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+
+      TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+        this.tableSpecFunction = tableSpecFunction;
+      }
+
+      @Override
+      public TableReference apply(BoundedWindow value) {
+        return parseTableSpec(tableSpecFunction.apply(value));
+      }
+    }
+
+    private Write() {
+      this(
+          null /* name */,
+          null /* jsonTableRef */,
+          null /* tableRefFunction */,
+          null /* jsonSchema */,
+          CreateDisposition.CREATE_IF_NEEDED,
+          WriteDisposition.WRITE_EMPTY,
+          null /* tableDescription */,
+          true /* validate */,
+          null /* bigQueryServices */);
+    }
+
+    private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
+        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+        @Nullable ValueProvider<String> jsonSchema,
+        CreateDisposition createDisposition,
+        WriteDisposition writeDisposition,
+        @Nullable String tableDescription,
+        boolean validate,
+        @Nullable BigQueryServices bigQueryServices) {
+      super(name);
+      this.jsonTableRef = jsonTableRef;
+      this.tableRefFunction = tableRefFunction;
+      this.jsonSchema = jsonSchema;
+      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+      this.tableDescription = tableDescription;
+      this.validate = validate;
+      this.bigQueryServices = bigQueryServices;
     }
 
     /**
-     * Creates a write transformation with the specified schema to use in table creation.
+     * Returns a copy of this write transformation, but writing to the specified table. Refer to
+     * {@link #parseTableSpec(String)} for the specification format.
      *
-     * <p>The schema is <i>required</i> only if writing to a table that does not already
-     * exist, and {@link CreateDisposition} is set to
-     * {@link CreateDisposition#CREATE_IF_NEEDED}.
+     * <p>Does not modify this object.
      */
-    public static Bound withSchema(TableSchema schema) {
-      return new Bound().withSchema(schema);
+    private Write withTableSpec(String tableSpec) {
+      return withTableRef(NestedValueProvider.of(
+          StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
     }
 
     /**
-     * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+     * Returns a copy of this write transformation, but writing to the specified table.
+     *
+     * <p>Does not modify this object.
      */
-    public static Bound withSchema(ValueProvider<TableSchema> schema) {
-      return new Bound().withSchema(schema);
+    public Write withTableRef(TableReference table) {
+      return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
     }
 
-    /** Creates a write transformation with the specified options for creating the table. */
-    public static Bound withCreateDisposition(CreateDisposition disposition) {
-      return new Bound().withCreateDisposition(disposition);
+    /**
+     * Returns a copy of this write transformation, but writing to the specified table. Refer to
+     * {@link #parseTableSpec(String)} for the specification format.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableSpec(ValueProvider<String> tableSpec) {
+      return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
     }
 
-    /** Creates a write transformation with the specified options for writing to the table. */
-    public static Bound withWriteDisposition(WriteDisposition disposition) {
-      return new Bound().withWriteDisposition(disposition);
+    /**
+     * Returns a copy of this write transformation, but writing to the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    private Write withTableRef(ValueProvider<TableReference> table) {
+      return new Write(name,
+          NestedValueProvider.of(table, new TableRefToJson()),
+          tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, bigQueryServices);
     }
 
-    /** Creates a write transformation with the specified table description. */
-    public static Bound withTableDescription(@Nullable String tableDescription) {
-      return new Bound().withTableDescription(tableDescription);
+    /**
+     * Returns a copy of this write transformation, but using the specified function to determine
+     * which table to write to for each window.
+     *
+     * <p>Does not modify this object.
+     *
+     * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
+     * should always return the same table specification.
+     */
+    private Write withTableSpec(
+        SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
     }
 
     /**
-     * Creates a write transformation with BigQuery table validation disabled.
+     * Returns a copy of this write transformation, but using the specified function to determine
+     * which table to write to for each window.
+     *
+     * <p>Does not modify this object.
+     *
+     * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+     * always return the same table reference.
      */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
+    private Write withTableRef(
+        SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, bigQueryServices);
     }
 
     /**
-     * A {@link PTransform} that can write either a bounded or unbounded
-     * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
+     * Returns a copy of this write transformation, but using the specified schema for rows
+     * to be written.
+     *
+     * <p>Does not modify this object.
      */
-    public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
-      // Maximum number of files in a single partition.
-      static final int MAX_NUM_FILES = 10000;
-
-      // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
-      static final long MAX_SIZE_BYTES = 11 * (1L << 40);
-
-      // The maximum number of retry jobs.
-      static final int MAX_RETRY_JOBS = 3;
-
-      // The maximum number of retries to poll the status of a job.
-      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-      static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
-      @Nullable final ValueProvider<String> jsonTableRef;
-
-      @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
-      // Table schema. The schema is required only if the table does not exist.
-      @Nullable final ValueProvider<String> jsonSchema;
+    public Write withSchema(TableSchema schema) {
+      return new Write(name, jsonTableRef, tableRefFunction,
+          StaticValueProvider.of(toJsonString(schema)),
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // Options for creating the table. Valid values are CREATE_IF_NEEDED and
-      // CREATE_NEVER.
-      final CreateDisposition createDisposition;
+    /**
+     * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+     */
+    public Write withSchema(ValueProvider<TableSchema> schema) {
+      return new Write(name, jsonTableRef, tableRefFunction,
+          NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // Options for writing to the table. Valid values are WRITE_TRUNCATE,
-      // WRITE_APPEND and WRITE_EMPTY.
-      final WriteDisposition writeDisposition;
+    /**
+     * Returns a copy of this write transformation, but using the specified create disposition.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withCreateDisposition(CreateDisposition createDisposition) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      @Nullable final String tableDescription;
+    /**
+     * Returns a copy of this write transformation, but using the specified write disposition.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withWriteDisposition(WriteDisposition writeDisposition) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // An option to indicate if table validation is desired. Default is true.
-      final boolean validate;
+    /**
+     * Returns a copy of this write transformation, but using the specified table description.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableDescription(@Nullable String tableDescription) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      @Nullable private BigQueryServices bigQueryServices;
+    /**
+     * Returns a copy of this write transformation, but without BigQuery table validation.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withoutValidation() {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, false, bigQueryServices);
+    }
 
-      private static class TranslateTableSpecFunction implements
-          SerializableFunction<BoundedWindow, TableReference> {
-        private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+    @VisibleForTesting
+    Write withTestServices(BigQueryServices testServices) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, testServices);
+    }
 
-        TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-          this.tableSpecFunction = tableSpecFunction;
+    private static void verifyTableNotExistOrEmpty(
+        DatasetService datasetService,
+        TableReference tableRef) {
+      try {
+        if (datasetService.getTable(tableRef) != null) {
+          checkState(
+              datasetService.isTableEmpty(tableRef),
+              "BigQuery table is not empty: %s.",
+              BigQueryIO.toTableSpec(tableRef));
         }
-
-        @Override
-        public TableReference apply(BoundedWindow value) {
-          return parseTableSpec(tableSpecFunction.apply(value));
+      } catch (IOException | InterruptedException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
         }
+        throw new RuntimeException(
+            "unable to confirm BigQuery table emptiness for table "
+                + BigQueryIO.toTableSpec(tableRef), e);
       }
+    }
 
-      /**
-       * @deprecated Should be private. Instead, use one of the factory methods in
-       * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
-       * instance of this class.
-       */
-      @Deprecated
-      public Bound() {
-        this(
-            null /* name */,
-            null /* jsonTableRef */,
-            null /* tableRefFunction */,
-            null /* jsonSchema */,
-            CreateDisposition.CREATE_IF_NEEDED,
-            WriteDisposition.WRITE_EMPTY,
-            null /* tableDescription */,
-            true /* validate */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
-          @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-          @Nullable ValueProvider<String> jsonSchema,
-          CreateDisposition createDisposition,
-          WriteDisposition writeDisposition,
-          @Nullable String tableDescription,
-          boolean validate,
-          @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.tableRefFunction = tableRefFunction;
-        this.jsonSchema = jsonSchema;
-        this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-        this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-        this.tableDescription = tableDescription;
-        this.validate = validate;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String tableSpec) {
-        return toTableRef(NestedValueProvider.of(
-            StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
-      }
+    @Override
+    public void validate(PCollection<TableRow> input) {
+      BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(TableReference table) {
-        return to(StaticValueProvider.of(toTableSpec(table)));
-      }
+      // Exactly one of the table and table reference can be configured.
+      checkState(
+          jsonTableRef != null || tableRefFunction != null,
+          "must set the table reference of a BigQueryIO.Write transform");
+      checkState(
+          jsonTableRef == null || tableRefFunction == null,
+          "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+              + " transform");
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(ValueProvider<String> tableSpec) {
-        return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+      // Require a schema if creating one or more tables.
+      checkArgument(
+          createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+          "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+      // The user specified a table.
+      if (jsonTableRef != null && validate) {
+        TableReference table = getTableWithDefaultProject(options).get();
+
+        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+        // Check for destination table presence and emptiness for early failure notification.
+        // Note that a presence check can fail when the table or dataset is created by an earlier
+        // stage of the pipeline. For these cases the #withoutValidation method can be used to
+        // disable the check.
+        verifyDatasetPresence(datasetService, table);
+        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+          verifyTablePresence(datasetService, table);
+        }
+        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+          verifyTableNotExistOrEmpty(datasetService, table);
+        }
       }
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      private Bound toTableRef(ValueProvider<TableReference> table) {
-        return new Bound(name,
-            NestedValueProvider.of(table, new TableRefToJson()),
-            tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+        // We will use BigQuery's streaming write API -- validate supported dispositions.
+        if (tableRefFunction != null) {
+          checkArgument(
+              createDisposition != CreateDisposition.CREATE_NEVER,
+              "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+              + " function.");
+        }
+        if (jsonSchema == null) {
+          checkArgument(
+              createDisposition == CreateDisposition.CREATE_NEVER,
+              "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+        }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
-       * should always return the same table specification.
-       */
-      public Bound to(
-          SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-        return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
+        checkArgument(
+            writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+                + " when using a tablespec function.");
+      } else {
+        // We will use a BigQuery load job -- validate the temp location.
+        String tempLocation = options.getTempLocation();
+        checkArgument(
+            !Strings.isNullOrEmpty(tempLocation),
+            "BigQueryIO.Write needs a GCS temp location to store temp files.");
+        if (bigQueryServices == null) {
+          try {
+            GcsPath.fromUri(tempLocation);
+          } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                    tempLocation),
+                e);
+          }
+        }
       }
+    }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-       * always return the same table reference.
-       */
-      public Bound toTableReference(
-          SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, bigQueryServices);
+    @Override
+    public PDone expand(PCollection<TableRow> input) {
+      Pipeline p = input.getPipeline();
+      BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+      BigQueryServices bqServices = getBigQueryServices();
+
+      // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
+      // StreamWithDeDup and BigQuery's streaming import API.
+      if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+        return input.apply(
+            new StreamWithDeDup(getTable(), tableRefFunction,
+                jsonSchema == null ? null : NestedValueProvider.of(
+                    jsonSchema, new JsonSchemaToTableSchema()),
+                createDisposition,
+                tableDescription,
+                bqServices));
       }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified schema for rows
-       * to be written.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withSchema(TableSchema schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction,
-            StaticValueProvider.of(toJsonString(schema)),
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-      /**
-       * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
-       */
-      public Bound withSchema(ValueProvider<TableSchema> schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction,
-            NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      String stepUuid = randomUUIDString();
 
-      /**
-       * Returns a copy of this write transformation, but using the specified create disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withCreateDisposition(CreateDisposition createDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      String tempLocation = options.getTempLocation();
+      String tempFilePrefix;
+      try {
+        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+        tempFilePrefix = factory.resolve(
+                factory.resolve(tempLocation, "BigQueryWriteTemp"),
+                stepUuid);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+            e);
       }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified write disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withWriteDisposition(WriteDisposition writeDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      // Create a singleton job ID token at execution time.
+      PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+      PCollectionView<String> jobIdTokenView = p
+          .apply("TriggerIdCreation", Create.of("ignored"))
+          .apply("CreateJobId", MapElements.via(
+              new SimpleFunction<String, String>() {
+                @Override
+                public String apply(String input) {
+                  return randomUUIDString();
+                }
+              }))
+          .apply(View.<String>asSingleton());
+
+      PCollection<TableRow> inputInGlobalWindow =
+          input.apply(
+              Window.<TableRow>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+
+      PCollection<KV<String, Long>> results = inputInGlobalWindow
+          .apply("WriteBundles",
+              ParDo.of(new WriteBundles(tempFilePrefix)));
+
+      TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+          new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+      TupleTag<KV<Long, List<String>>> singlePartitionTag =
+          new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+      PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+          .apply("ResultsView", View.<KV<String, Long>>asIterable());
+      PCollectionTuple partitions = singleton.apply(ParDo
+          .of(new WritePartition(
+              resultsView,
+              multiPartitionsTag,
+              singlePartitionTag))
+          .withSideInputs(resultsView)
+          .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+      // Write multiple partitions to separate temporary tables
+      PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+          .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+          .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+              false,
+              bqServices,
+              jobIdTokenView,
+              tempFilePrefix,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              jsonSchema,
+              WriteDisposition.WRITE_EMPTY,
+              CreateDisposition.CREATE_IF_NEEDED,
+              tableDescription))
+          .withSideInputs(jobIdTokenView));
+
+      PCollectionView<Iterable<String>> tempTablesView = tempTables
+          .apply("TempTablesView", View.<String>asIterable());
+      singleton.apply(ParDo
+          .of(new WriteRename(
+              bqServices,
+              jobIdTokenView,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              writeDisposition,
+              createDisposition,
+              tempTablesView,
+              tableDescription))
+          .withSideInputs(tempTablesView, jobIdTokenView));
+
+      // Write single partition to final table
+      partitions.get(singlePartitionTag)
+          .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+          .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+              true,
+              bqServices,
+              jobIdTokenView,
+              tempFilePrefix,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              jsonSchema,
+              writeDisposition,
+              createDisposition,
+              tableDescription))
+          .withSideInputs(jobIdTokenView));
 
-      /**
-       * Returns a copy of this write transformation, but using the specified table description.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withTableDescription(@Nullable String tableDescription) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      return PDone.in(input.getPipeline());
+    }
 
-      /**
-       * Returns a copy of this write transformation, but without BigQuery table validation.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, false, bigQueryServices);
-      }
+    private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
+      private transient TableRowWriter writer = null;
+      private final String tempFilePrefix;
 
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, testServices);
+      WriteBundles(String tempFilePrefix) {
+        this.tempFilePrefix = tempFilePrefix;
       }
 
-      private static void verifyTableNotExistOrEmpty(
-          DatasetService datasetService,
-          TableReference tableRef) {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        if (writer == null) {
+          writer = new TableRowWriter(tempFilePrefix);
+          writer.open(UUID.randomUUID().toString());
+          LOG.debug("Done opening writer {}", writer);
+        }
         try {
-          if (datasetService.getTable(tableRef) != null) {
-            checkState(
-                datasetService.isTableEmpty(tableRef),
-                "BigQuery table is not empty: %s.",
-                BigQueryIO.toTableSpec(tableRef));
-          }
-        } catch (IOException | InterruptedException e) {
-          if (e instanceof InterruptedException) {
-            Thread.currentThread().interrupt();
+          writer.write(c.element());
+        } catch (Exception e) {
+          // Discard write result and close the write.
+          try {
+            writer.close();
+            // The writer does not need to be reset, as this DoFn cannot be reused.
+          } catch (Exception closeException) {
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
           }
-          throw new RuntimeException(
-              "unable to confirm BigQuery table emptiness for table "
-                  + BigQueryIO.toTableSpec(tableRef), e);
+          throw e;
         }
       }
 
-      @Override
-      public void validate(PCollection<TableRow> input) {
-        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        // Exactly one of the table and table reference can be configured.
-        checkState(
-            jsonTableRef != null || tableRefFunction != null,
-            "must set the table reference of a BigQueryIO.Write transform");
-        checkState(
-            jsonTableRef == null || tableRefFunction == null,
-            "Cannot set both a table reference and a table function for a BigQueryIO.Write"
-                + " transform");
-
-        // Require a schema if creating one or more tables.
-        checkArgument(
-            createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
-            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
-        // The user specified a table.
-        if (jsonTableRef != null && validate) {
-          TableReference table = getTableWithDefaultProject(options).get();
-
-          DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-          // Check for destination table presence and emptiness for early failure notification.
-          // Note that a presence check can fail when the table or dataset is created by an earlier
-          // stage of the pipeline. For these cases the #withoutValidation method can be used to
-          // disable the check.
-          verifyDatasetPresence(datasetService, table);
-          if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-            verifyTablePresence(datasetService, table);
-          }
-          if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-            verifyTableNotExistOrEmpty(datasetService, table);
-          }
-        }
-
-        if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
-          // We will use BigQuery's streaming write API -- validate supported dispositions.
-          if (tableRefFunction != null) {
-            checkArgument(
-                createDisposition != CreateDisposition.CREATE_NEVER,
-                "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
-                + " function.");
-          }
-          if (jsonSchema == null) {
-            checkArgument(
-                createDisposition == CreateDisposition.CREATE_NEVER,
-                "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
-          }
-
-          checkArgument(
-              writeDisposition != WriteDisposition.WRITE_TRUNCATE,
-              "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
-                  + " when using a tablespec function.");
-        } else {
-          // We will use a BigQuery load job -- validate the temp location.
-          String tempLocation = options.getTempLocation();
-          checkArgument(
-              !Strings.isNullOrEmpty(tempLocation),
-              "BigQueryIO.Write needs a GCS temp location to store temp files.");
-          if (bigQueryServices == null) {
-            try {
-              GcsPath.fromUri(tempLocation);
-            } catch (IllegalArgumentException e) {
-              throw new IllegalArgumentException(
-                  String.format(
-                      "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                      tempLocation),
-                  e);
-            }
-          }
+      @FinishBundle
+      public void finishBundle(Context c) throws Exception {
+        if (writer != null) {
+          c.output(writer.close());
+          writer = null;
         }
       }
 
       @Override
-      public PDone expand(PCollection<TableRow> input) {
-        Pipeline p = input.getPipeline();
-        BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-        BigQueryServices bqServices = getBigQueryServices();
-
-        // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
-        // StreamWithDeDup and BigQuery's streaming import API.
-        if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
-          return input.apply(
-              new StreamWithDeDup(getTable(), tableRefFunction,
-                  jsonSchema == null ? null : NestedValueProvider.of(
-                      jsonSchema, new JsonSchemaToTableSchema()),
-                  createDisposition,
-                  tableDescription,
-                  bqServices));
-        }
-
-        ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
-        String stepUuid = randomUUIDString();
-
-        String tempLocation = options.getTempLocation();
-        String tempFilePrefix;
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          tempFilePrefix = factory.resolve(
-                  factory.resolve(tempLocation, "BigQueryWriteTemp"),
-                  stepUuid);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-              e);
-        }
-
-        // Create a singleton job ID token at execution time.
-        PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
-        PCollectionView<String> jobIdTokenView = p
-            .apply("TriggerIdCreation", Create.of("ignored"))
-            .apply("CreateJobId", MapElements.via(
-                new SimpleFunction<String, String>() {
-                  @Override
-                  public String apply(String input) {
-                    return randomUUIDString();
-                  }
-                }))
-            .apply(View.<String>asSingleton());
-
-        PCollection<TableRow> inputInGlobalWindow =
-            input.apply(
-                Window.<TableRow>into(new GlobalWindows())
-                    .triggering(DefaultTrigger.of())
-                    .discardingFiredPanes());
-
-        PCollection<KV<String, Long>> results = inputInGlobalWindow
-            .apply("WriteBundles",
-                ParDo.of(new WriteBundles(tempFilePrefix)));
-
-        TupleTag<KV<Long, List<String>>> multiPartitionsTag =
-            new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
-        TupleTag<KV<Long, List<String>>> singlePartitionTag =
-            new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
-        PCollectionView<Iterable<KV<String, Long>>> resultsView = results
-            .apply("ResultsView", View.<KV<String, Long>>asIterable());
-        PCollectionTuple partitions = singleton.apply(ParDo
-            .of(new WritePartition(
-                resultsView,
-                multiPartitionsTag,
-                singlePartitionTag))
-            .withSideInputs(resultsView)
-            .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
-        // Write multiple partitions to separate temporary tables
-        PCollection<String> tempTables = partitions.get(multiPartitionsTag)
-            .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
-            .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
-                false,
-                bqServices,
-                jobIdTokenView,
-                tempFilePrefix,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                jsonSchema,
-                WriteDisposition.WRITE_EMPTY,
-                CreateDisposition.CREATE_IF_NEEDED,
-                tableDescription))
-            .withSideInputs(jobIdTokenView));
-
-        PCollectionView<Iterable<String>> tempTablesView = tempTables
-            .apply("TempTablesView", View.<String>asIterable());
-        singleton.apply(ParDo
-            .of(new WriteRename(
-                bqServices,
-                jobIdTokenView,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                writeDisposition,
-                createDisposition,
-                tempTablesView,
-                tableDescription))
-            .withSideInputs(tempTablesView, jobIdTokenView));
-
-        // Write single partition to final table
-        partitions.get(singlePartitionTag)
-            .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
-            .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
-                true,
-                bqServices,
-                jobIdTokenView,
-                tempFilePrefix,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                jsonSchema,
-                writeDisposition,
-                createDisposition,
-                tableDescription))
-            .withSideInputs(jobIdTokenView));
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
 
-        return PDone.in(input.getPipeline());
+        builder
+            .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+                .withLabel("Temporary File Prefix"));
       }
+    }
 
-      private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
-        private transient TableRowWriter writer = null;
-        private final String tempFilePrefix;
-
-        WriteBundles(String tempFilePrefix) {
-          this.tempFilePrefix = tempFilePrefix;
-        }
-
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-          if (writer == null) {
-            writer = new TableRowWriter(tempFilePrefix);
-            writer.open(UUID.randomUUID().toString());
-            LOG.debug("Done opening writer {}", writer);
-          }
-          try {
-            writer.write(c.element());
-          } catch (Exception e) {
-            // Discard write result and close the write.
-            try {
-              writer.close();
-              // The writer does not need to be reset, as this DoFn cannot be reused.
-            } catch (Exception closeException) {
-              // Do not mask the exception that caused the write to fail.
-              e.addSuppressed(closeException);
-            }
-            throw e;
-          }
-        }
-
-        @FinishBundle
-        public void finishBundle(Context c) throws Exception {
-          if (writer != null) {
-            c.output(writer.close());
-            writer = null;
-          }
-        }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-          builder
-              .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-                  .withLabel("Temporary File Prefix"));
-        }
-      }
+      builder
+          .addIfNotNull(DisplayData.item("table", jsonTableRef)
+            .withLabel("Table Reference"))
+          .addIfNotNull(DisplayData.item("schema", jsonSchema)
+            .withLabel("Table Schema"));
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
+      if (tableRefFunction != null) {
+        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+          .withLabel("Table Reference Function"));
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("table", jsonTableRef)
-              .withLabel("Table Reference"))
-            .addIfNotNull(DisplayData.item("schema", jsonSchema)
-              .withLabel("Table Schema"));
-
-        if (tableRefFunction != null) {
-          builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
-            .withLabel("Table Reference Function"));
-        }
+      builder
+          .add(DisplayData.item("createDisposition", createDisposition.toString())
+            .withLabel("Table CreateDisposition"))
+          .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+            .withLabel("Table WriteDisposition"))
+          .addIfNotDefault(DisplayData.item("validation", validate)
+            .withLabel("Validation Enabled"), true)
+          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+            .withLabel("Table Description"));
+    }
 
-        builder
-            .add(DisplayData.item("createDisposition", createDisposition.toString())
-              .withLabel("Table CreateDisposition"))
-            .add(DisplayData.item("writeDisposition", writeDisposition.toString())
-              .withLabel("Table WriteDisposition"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"), true)
-            .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-              .withLabel("Table Description"));
-      }
+    /** Returns the create disposition. */
+    public CreateDisposition getCreateDisposition() {
+      return createDisposition;
+    }
 
-      /** Returns the create disposition. */
-      public CreateDisposition getCreateDisposition() {
-        return createDisposition;
-      }
+    /** Returns the write disposition. */
+    public WriteDisposition getWriteDisposition() {
+      return writeDisposition;
+    }
 
-      /** Returns the write disposition. */
-      public WriteDisposition getWriteDisposition() {
-        return writeDisposition;
-      }
+    /** Returns the table schema. */
+    public TableSchema getSchema() {
+      return fromJsonString(
+          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+    }
 
-      /** Returns the table schema. */
-      public TableSchema getSchema() {
-        return fromJsonString(
-            jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+    /**
+     * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+     *
+     * <p>If the table's project is not specified, use the executing project.
+     */
+    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+        BigQueryOptions bqOptions) {
+      ValueProvider<TableReference> table = getTable();
+      if (table == null) {
+        return table;
       }
-
-      /**
-       * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
-          BigQueryOptions bqOptions) {
-        ValueProvider<TableReference> table = getTable();
-        if (table == null) {
-          return table;
-        }
-        if (!table.isAccessible()) {
-          LOG.info("Using a dynamic value for table input. This must contain a project"
-              + " in the table reference: {}", table);
-          return table;
-        }
-        if (Strings.isNullOrEmpty(table.get().getProjectId())) {
-          // If user does not specify a project we assume the table to be located in
-          // the default project.
-          TableReference tableRef = table.get();
-          tableRef.setProjectId(bqOptions.getProject());
-          return NestedValueProvider.of(StaticValueProvider.of(
-              toJsonString(tableRef)), new JsonTableRefToTableRef());
-        }
+      if (!table.isAccessible()) {
+        LOG.info("Using a dynamic value for table input. This must contain a project"
+            + " in the table reference: {}", table);
         return table;
       }
-
-      /** Returns the table reference, or {@code null}. */
-      @Nullable
-      public ValueProvider<TableReference> getTable() {
-        return jsonTableRef == null ? null :
-            NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+      if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+        // If user does not specify a project we assume the table to be located in
+        // the default project.
+        TableReference tableRef = table.get();
+        tableRef.setProjectId(bqOptions.getProject());
+        return NestedValueProvider.of(StaticValueProvider.of(
+            toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
+      return table;
+    }
 
-      /** Returns {@code true} if table validation is enabled. */
-      public boolean getValidate() {
-        return validate;
-      }
+    /** Returns the table reference, or {@code null}. */
+    @Nullable
+    public ValueProvider<TableReference> getTable() {
+      return jsonTableRef == null ? null :
+          NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+    }
 
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
+    /** Returns {@code true} if table validation is enabled. */
+    public boolean getValidate() {
+      return validate;
+    }
+
+    private BigQueryServices getBigQueryServices() {
+      if (bigQueryServices == null) {
+        bigQueryServices = new BigQueryServicesImpl();
       }
+      return bigQueryServices;
     }
 
     static class TableRowWriter {
@@ -2231,8 +2180,8 @@ public class BigQueryIO {
         List<String> currResults = Lists.newArrayList();
         for (int i = 0; i < results.size(); ++i) {
           KV<String, Long> fileResult = results.get(i);
-          if (currNumFiles + 1 > Bound.MAX_NUM_FILES
-              || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+          if (currNumFiles + 1 > Write.MAX_NUM_FILES
+              || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
             c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
             currResults = Lists.newArrayList();
             currNumFiles = 0;
@@ -2331,13 +2280,13 @@ public class BigQueryIO {
 
         String projectId = ref.getProjectId();
         Job lastFailedLoadJob = null;
-        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
-          Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
           Status jobStatus = parseStatus(loadJob);
           switch (jobStatus) {
             case SUCCEEDED:
@@ -2361,7 +2310,7 @@ public class BigQueryIO {
             "Failed to create load job with id prefix %s, "
                 + "reached max retries: %d, last failed load job: %s.",
             jobIdPrefix,
-            Bound.MAX_RETRY_JOBS,
+            Write.MAX_RETRY_JOBS,
             jobToPrettyString(lastFailedLoadJob)));
       }
 
@@ -2477,13 +2426,13 @@ public class BigQueryIO {
 
         String projectId = ref.getProjectId();
         Job lastFailedCopyJob = null;
-        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startCopyJob(jobRef, copyConfig);
-          Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
           Status jobStatus = parseStatus(copyJob);
           switch (jobStatus) {
             case SUCCEEDED:
@@ -2507,7 +2456,7 @@ public class BigQueryIO {
             "Failed to create copy job with id prefix %s, "
                 + "reached max retries: %d, last failed copy job: %s.",
             jobIdPrefix,
-            Bound.MAX_RETRY_JOBS,
+            Write.MAX_RETRY_JOBS,
             jobToPrettyString(lastFailedCopyJob)));
       }
 
@@ -2536,9 +2485,6 @@ public class BigQueryIO {
                 .withLabel("Create Disposition"));
       }
     }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
   }
 
   private static String jobToPrettyString(@Nullable Job job) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bb1528b..f403c5a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -690,11 +690,11 @@ public class BigQueryIOTest implements Serializable {
   }
 
   private void checkWriteObject(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      BigQueryIO.Write write, String project, String dataset, String table,
       TableSchema schema, CreateDisposition createDisposition,
       WriteDisposition writeDisposition, String tableDescription) {
     checkWriteObjectWithValidate(
-        bound,
+        write,
         project,
         dataset,
         table,
@@ -706,17 +706,17 @@ public class BigQueryIOTest implements Serializable {
   }
 
   private void checkWriteObjectWithValidate(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      BigQueryIO.Write write, String project, String dataset, String table,
       TableSchema schema, CreateDisposition createDisposition,
       WriteDisposition writeDisposition, String tableDescription, boolean validate) {
-    assertEquals(project, bound.getTable().get().getProjectId());
-    assertEquals(dataset, bound.getTable().get().getDatasetId());
-    assertEquals(table, bound.getTable().get().getTableId());
-    assertEquals(schema, bound.getSchema());
-    assertEquals(createDisposition, bound.createDisposition);
-    assertEquals(writeDisposition, bound.writeDisposition);
-    assertEquals(tableDescription, bound.tableDescription);
-    assertEquals(validate, bound.validate);
+    assertEquals(project, write.getTable().get().getProjectId());
+    assertEquals(dataset, write.getTable().get().getDatasetId());
+    assertEquals(table, write.getTable().get().getTableId());
+    assertEquals(schema, write.getSchema());
+    assertEquals(createDisposition, write.createDisposition);
+    assertEquals(writeDisposition, write.writeDisposition);
+    assertEquals(tableDescription, write.tableDescription);
+    assertEquals(validate, write.validate);
   }
 
   @Before
@@ -1328,10 +1328,10 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWrite() {
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
             BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
@@ -1355,7 +1355,7 @@ public class BigQueryIOTest implements Serializable {
     options.as(StreamingOptions.class).setStreaming(streaming);
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("project:dataset.table")
         .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
         .withTestServices(new FakeBigQueryServices()
@@ -1375,10 +1375,10 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildWriteWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
         BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
     checkWriteObjectWithValidate(
-        bound,
+        write,
         "foo.com:project",
         "somedataset",
         "sometable",
@@ -1391,9 +1391,9 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteDefaultProject() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
+    BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
     checkWriteObject(
-        bound, null, "somedataset", "sometable",
+        write, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
@@ -1403,89 +1403,80 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
+    BigQueryIO.Write write = BigQueryIO.Write.to(table);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testBuildWriteWithoutTable() {
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference");
-    p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
   public void testBuildWriteWithSchema() {
     TableSchema schema = new TableSchema();
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
         BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithCreateDispositionNever() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionTruncate() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionAppend() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionEmpty() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithWriteWithTableDescription() {
     final String tblDescription = "foo bar table";
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withTableDescription(tblDescription);
     checkWriteObject(
-        bound,
+        write,
         "foo.com:project",
         "somedataset",
         "sometable",
@@ -1501,7 +1492,7 @@ public class BigQueryIOTest implements Serializable {
     TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
     final String tblDescription = "foo bar table";
 
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to(tableSpec)
         .withSchema(schema)
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1702,35 +1693,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testWriteValidateFailsTableAndTableSpec() {
-    p.enableAbandonedNodeEnforcement(false);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Cannot set both a table reference and a table function");
-    p
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write
-            .to("dataset.table")
-            .to(new SerializableFunction<BoundedWindow, String>() {
-              @Override
-              public String apply(BoundedWindow input) {
-                return null;
-              }
-            }));
-  }
-
-  @Test
-  public void testWriteValidateFailsNoTableAndNoTableSpec() {
-    p.enableAbandonedNodeEnforcement(false);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
-    p
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply("name", BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
   public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(mockJobService)
@@ -2094,7 +2056,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionSinglePartition() throws Exception {
-    long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+    long numFiles = BigQueryIO.Write.MAX_NUM_FILES;
     long fileSize = 1;
 
     // One partition is needed.
@@ -2104,7 +2066,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionManyFiles() throws Exception {
-    long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+    long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3;
     long fileSize = 1;
 
     // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
@@ -2115,7 +2077,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWritePartitionLargeFileSize() throws Exception {
     long numFiles = 10;
-    long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+    long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3;
 
     // One partition is needed for each group of three files.
     long expectedNumPartitions = 4;
@@ -2382,7 +2344,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to(options.getOutputTable())
         .withSchema(NestedValueProvider.of(
             options.getOutputSchema(), new JsonSchemaToTableSchema()))


[05/10] beam git commit: Use AutoValue for BigQueryIO.Read

Posted by tg...@apache.org.
Use AutoValue for BigQueryIO.Read


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32cba321
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32cba321
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32cba321

Branch: refs/heads/master
Commit: 32cba321c04c5e3fb18856c84ea10b3513264dd5
Parents: d6b3e11
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:51:04 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 215 +++++++------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  21 +-
 2 files changed, 84 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 90d7f67..e5db60e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -36,9 +36,11 @@ import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
+import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -453,97 +455,86 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
-    @Nullable final ValueProvider<String> jsonTableRef;
-    @Nullable final ValueProvider<String> query;
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+    @Nullable abstract ValueProvider<String> getJsonTableRef();
+    @Nullable abstract ValueProvider<String> getQuery();
+    abstract boolean getValidate();
+    @Nullable abstract Boolean getFlattenResults();
+    @Nullable abstract Boolean getUseLegacySql();
+    @Nullable abstract BigQueryServices getBigQueryServices();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+      abstract Builder setQuery(ValueProvider<String> query);
+      abstract Builder setValidate(boolean validate);
+      abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
+      abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
+      abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+
+      abstract Read build();
+    }
+
+    private static Builder builder() {
+      return new AutoValue_BigQueryIO_Read.Builder()
+          .setValidate(true)
+          .setBigQueryServices(new BigQueryServicesImpl());
+    }
 
     /**
      * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
     public static Read from(String tableSpec) {
-      return new Read().from(StaticValueProvider.of(tableSpec));
+      return from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
     public static Read from(ValueProvider<String> tableSpec) {
-      return new Read().from(tableSpec);
+      return builder()
+          .setJsonTableRef(
+              NestedValueProvider.of(
+                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+                  new TableRefToJson())).build();
     }
 
     /**
      * Reads results received after executing the given query.
      */
     public static Read fromQuery(String query) {
-      return new Read().fromQuery(StaticValueProvider.of(query));
+      return fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
     public static Read fromQuery(ValueProvider<String> query) {
-      return new Read().fromQuery(query);
+      return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
     public static Read from(TableReference table) {
-      return new Read().from(table);
+      return from(StaticValueProvider.of(toTableSpec(table)));
     }
 
-    /**
-     * Disable validation that the table exists or the query succeeds prior to pipeline
-     * submission. Basic validation (such as ensuring that a query or table is specified) still
-     * occurs.
-     */
-    final boolean validate;
-    @Nullable final Boolean flattenResults;
-    @Nullable final Boolean useLegacySql;
-    @Nullable BigQueryServices bigQueryServices;
-
-    @VisibleForTesting @Nullable String stepUuid;
-    @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
     private static final String QUERY_VALIDATION_FAILURE_ERROR =
         "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
         + " pipeline, This validation can be disabled using #withoutValidation.";
 
-    private Read() {
-      this(
-          null /* name */,
-          null /* query */,
-          null /* jsonTableRef */,
-          true /* validate */,
-          null /* flattenResults */,
-          null /* useLegacySql */,
-          null /* bigQueryServices */);
-    }
-
-    private Read(
-        String name, @Nullable ValueProvider<String> query,
-        @Nullable ValueProvider<String> jsonTableRef, boolean validate,
-        @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
-        @Nullable BigQueryServices bigQueryServices) {
-      super(name);
-      this.jsonTableRef = jsonTableRef;
-      this.query = query;
-      this.validate = validate;
-      this.flattenResults = flattenResults;
-      this.useLegacySql = useLegacySql;
-      this.bigQueryServices = bigQueryServices;
-    }
-
     /**
      * Disable validation that the table exists or the query succeeds prior to pipeline
      * submission. Basic validation (such as ensuring that a query or table is specified) still
      * occurs.
      */
     public Read withoutValidation() {
-      return new Read(
-          name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
-          bigQueryServices);
+      return toBuilder().setValidate(false).build();
     }
 
     /**
@@ -554,9 +545,7 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read withoutResultFlattening() {
-      return new Read(
-          name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
-          bigQueryServices);
+      return toBuilder().setFlattenResults(false).build();
     }
 
     /**
@@ -566,15 +555,12 @@ public class BigQueryIO {
      * from a table will cause an error during validation.
      */
     public Read usingStandardSql() {
-      return new Read(
-          name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
-          bigQueryServices);
+      return toBuilder().setUseLegacySql(false).build();
     }
 
     @VisibleForTesting
     Read withTestServices(BigQueryServices testServices) {
-      return new Read(
-          name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+      return toBuilder().setBigQueryServices(testServices).build();
     }
 
     @Override
@@ -587,7 +573,7 @@ public class BigQueryIO {
       checkArgument(
           !Strings.isNullOrEmpty(tempLocation),
           "BigQueryIO.Read needs a GCS temp location to store temp files.");
-      if (bigQueryServices == null) {
+      if (getBigQueryServices() == null) {
         try {
           GcsPath.fromUri(tempLocation);
         } catch (IllegalArgumentException e) {
@@ -602,63 +588,65 @@ public class BigQueryIO {
       ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
 
       checkState(
-          table == null || query == null,
+          table == null || getQuery() == null,
           "Invalid BigQueryIO.Read: table reference and query may not both be set");
       checkState(
-          table != null || query != null,
+          table != null || getQuery() != null,
           "Invalid BigQueryIO.Read: one of table reference and query must be set");
 
       if (table != null) {
         checkState(
-            flattenResults == null,
+            getFlattenResults() == null,
             "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
                 + " preference, which only applies to queries");
         checkState(
-            useLegacySql == null,
+            getUseLegacySql() == null,
             "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
                 + " preference, which only applies to queries");
       } else /* query != null */ {
-        checkState(flattenResults != null, "flattenResults should not be null if query is set");
-        checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+        checkState(
+            getFlattenResults() != null, "flattenResults should not be null if query is set");
+        checkState(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
       }
 
       // Note that a table or query check can fail if the table or dataset are created by
       // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
       // For these cases the withoutValidation method can be used to disable the check.
-      if (validate && table != null) {
+      if (getValidate() && table != null) {
         checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
         // Check for source table presence for early failure notification.
         DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
         verifyDatasetPresence(datasetService, table.get());
         verifyTablePresence(datasetService, table.get());
-      } else if (validate && query != null) {
-        checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+      } else if (getValidate() && getQuery() != null) {
+        checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
         JobService jobService = getBigQueryServices().getJobService(bqOptions);
         try {
           jobService.dryRunQuery(
               bqOptions.getProject(),
               new JobConfigurationQuery()
-                  .setQuery(query.get())
-                  .setFlattenResults(flattenResults)
-                  .setUseLegacySql(useLegacySql));
+                  .setQuery(getQuery().get())
+                  .setFlattenResults(getFlattenResults())
+                  .setUseLegacySql(getUseLegacySql()));
         } catch (Exception e) {
           throw new IllegalArgumentException(
-              String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
+              String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
         }
       }
     }
 
     @Override
     public PCollection<TableRow> expand(PBegin input) {
-      stepUuid = randomUUIDString();
+      String stepUuid = randomUUIDString();
       BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-      jobUuid = NestedValueProvider.of(
+      ValueProvider<String> jobUuid = NestedValueProvider.of(
          StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
       final ValueProvider<String> jobIdToken = NestedValueProvider.of(
           jobUuid, new BeamJobUuidToBigQueryJobUuid());
 
       BoundedSource<TableRow> source;
-      final BigQueryServices bqServices = getBigQueryServices();
+      final BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       final String extractDestinationDir;
       String tempLocation = bqOptions.getTempLocation();
@@ -671,11 +659,18 @@ public class BigQueryIO {
       }
 
       final String executingProject = bqOptions.getProject();
-      if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
-        source = BigQueryQuerySource.create(
-            jobIdToken, query, NestedValueProvider.of(
-              jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
-            flattenResults, useLegacySql, extractDestinationDir, bqServices);
+      if (getQuery() != null
+          && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
+        source =
+            BigQueryQuerySource.create(
+                jobIdToken,
+                getQuery(),
+                NestedValueProvider.of(
+                    jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+                getFlattenResults(),
+                getUseLegacySql(),
+                extractDestinationDir,
+                bqServices);
       } else {
         ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
         source = BigQueryTableSource.create(
@@ -726,13 +721,13 @@ public class BigQueryIO {
       builder
           .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
             .withLabel("Table"))
-          .addIfNotNull(DisplayData.item("query", query)
+          .addIfNotNull(DisplayData.item("query", getQuery())
             .withLabel("Query"))
-          .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+          .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults())
             .withLabel("Flatten Query Results"))
-          .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+          .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql())
             .withLabel("Use Legacy SQL Dialect"))
-          .addIfNotDefault(DisplayData.item("validation", validate)
+          .addIfNotDefault(DisplayData.item("validation", getValidate())
             .withLabel("Validation Enabled"),
               true);
     }
@@ -769,8 +764,8 @@ public class BigQueryIO {
      */
     @Nullable
     public ValueProvider<TableReference> getTableProvider() {
-      return jsonTableRef == null
-          ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+      return getJsonTableRef() == null
+          ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
     /**
      * Returns the table to read, or {@code null} if reading from a query instead.
@@ -780,52 +775,6 @@ public class BigQueryIO {
       ValueProvider<TableReference> provider = getTableProvider();
       return provider == null ? null : provider.get();
     }
-
-    /**
-     * Returns the query to be read, or {@code null} if reading from a table instead.
-     */
-    @Nullable
-    public String getQuery() {
-      return query == null ? null : query.get();
-    }
-
-    /**
-     * Returns the query to be read, or {@code null} if reading from a table instead.
-     */
-    @Nullable
-    public ValueProvider<String> getQueryProvider() {
-      return query;
-    }
-
-    /**
-     * Returns true if table validation is enabled.
-     */
-    public boolean getValidate() {
-      return validate;
-    }
-
-    /**
-     * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
-     */
-    public Boolean getFlattenResults() {
-      return flattenResults;
-    }
-
-    /**
-     * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
-     * if not applicable.
-     */
-    @Nullable
-    public Boolean getUseLegacySql() {
-      return useLegacySql;
-    }
-
-    private BigQueryServices getBigQueryServices() {
-      if (bigQueryServices == null) {
-        bigQueryServices = new BigQueryServicesImpl();
-      }
-      return bigQueryServices;
-    }
   }
 
   /**
@@ -1863,7 +1812,7 @@ public class BigQueryIO {
 
       ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-      String stepUuid = randomUUIDString();
+      final String stepUuid = randomUUIDString();
 
       String tempLocation = options.getTempLocation();
       String tempFilePrefix;
@@ -1886,7 +1835,7 @@ public class BigQueryIO {
               new SimpleFunction<String, String>() {
                 @Override
                 public String apply(String input) {
-                  return randomUUIDString();
+                  return stepUuid;
                 }
               }))
           .apply(View.<String>asSingleton());

http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f403c5a..fdaa81c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -678,14 +677,14 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(project, read.getTable().getProjectId());
     assertEquals(dataset, read.getTable().getDatasetId());
     assertEquals(table, read.getTable().getTableId());
-    assertNull(read.query);
+    assertNull(read.getQuery());
     assertEquals(validate, read.getValidate());
   }
 
   private void checkReadQueryObjectWithValidate(
       BigQueryIO.Read read, String query, boolean validate) {
     assertNull(read.getTable());
-    assertEquals(query, read.getQuery());
+    assertEquals(query, read.getQuery().get());
     assertEquals(validate, read.getValidate());
   }
 
@@ -2433,20 +2432,4 @@ public class BigQueryIOTest implements Serializable {
                 BigQueryIO.TableRowInfoCoder.of()),
             IntervalWindow.getCoder()));
   }
-
-  @Test
-  public void testUniqueStepIdRead() {
-    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    Pipeline pipeline = TestPipeline.create(options);
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-    BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
-        options.getInputQuery()).withoutValidation();
-    pipeline.apply(read1);
-    BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
-        options.getInputQuery()).withoutValidation();
-    pipeline.apply(read2);
-    assertNotEquals(read1.stepUuid, read2.stepUuid);
-    assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
-  }
 }


[07/10] beam git commit: Simplify configuration of StreamWithDedup

Posted by tg...@apache.org.
Simplify configuration of StreamWithDedup


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1adcbaea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1adcbaea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1adcbaea

Branch: refs/heads/master
Commit: 1adcbaea799e83016ec91f7b7155c3a25804ce6c
Parents: 5c71589
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:19:51 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:32 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 73 +++++++-------------
 1 file changed, 26 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1adcbaea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d2f6ba6..e039c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,7 +40,6 @@ import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -462,7 +461,7 @@ public class BigQueryIO {
     abstract boolean getValidate();
     @Nullable abstract Boolean getFlattenResults();
     @Nullable abstract Boolean getUseLegacySql();
-    @Nullable abstract BigQueryServices getBigQueryServices();
+    abstract BigQueryServices getBigQueryServices();
 
     abstract Builder toBuilder();
 
@@ -645,8 +644,6 @@ public class BigQueryIO {
           jobUuid, new BeamJobUuidToBigQueryJobUuid());
 
       BoundedSource<TableRow> source;
-      final BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       final String extractDestinationDir;
       String tempLocation = bqOptions.getTempLocation();
@@ -670,11 +667,11 @@ public class BigQueryIO {
                 getFlattenResults(),
                 getUseLegacySql(),
                 extractDestinationDir,
-                bqServices);
+                getBigQueryServices());
       } else {
         ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
         source = BigQueryTableSource.create(
-            jobIdToken, inputTable, extractDestinationDir, bqServices,
+            jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
             StaticValueProvider.of(executingProject));
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -687,7 +684,7 @@ public class BigQueryIO {
                   .setProjectId(executingProject)
                   .setJobId(getExtractJobId(jobIdToken));
 
-              Job extractJob = bqServices.getJobService(bqOptions)
+              Job extractJob = getBigQueryServices().getJobService(bqOptions)
                   .getJob(jobRef);
 
               Collection<String> extractFiles = null;
@@ -1390,7 +1387,7 @@ public class BigQueryIO {
     @Nullable abstract String getTableDescription();
     /** An option to indicate if table validation is desired. Default is true. */
     abstract boolean getValidate();
-    @Nullable abstract BigQueryServices getBigQueryServices();
+    abstract BigQueryServices getBigQueryServices();
 
     abstract Builder toBuilder();
 
@@ -1650,12 +1647,10 @@ public class BigQueryIO {
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
       // The user specified a table.
-      BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
       if (getJsonTableRef() != null && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
 
-        DatasetService datasetService = bqServices.getDatasetService(options);
+        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
         // Note that a presence check can fail when the table or dataset is created by an earlier
         // stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1693,7 +1688,7 @@ public class BigQueryIO {
         checkArgument(
             !Strings.isNullOrEmpty(tempLocation),
             "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (bqServices == null) {
+        if (getBigQueryServices() == null) {
           try {
             GcsPath.fromUri(tempLocation);
           } catch (IllegalArgumentException e) {
@@ -1711,19 +1706,11 @@ public class BigQueryIO {
     public PDone expand(PCollection<TableRow> input) {
       Pipeline p = input.getPipeline();
       BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-      BigQueryServices bqServices =
-          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
       if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
-        return input.apply(
-            new StreamWithDeDup(getTable(), getTableRefFunction(),
-                getJsonSchema() == null ? null : NestedValueProvider.of(
-                    getJsonSchema(), new JsonSchemaToTableSchema()),
-                getCreateDisposition(),
-                getTableDescription(),
-                bqServices));
+        return input.apply(new StreamWithDeDup(this));
       }
 
       ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -1786,7 +1773,7 @@ public class BigQueryIO {
           .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
           .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
               false,
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
@@ -1800,7 +1787,7 @@ public class BigQueryIO {
           .apply("TempTablesView", View.<String>asIterable());
       singleton.apply(ParDo
           .of(new WriteRename(
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               NestedValueProvider.of(table, new TableRefToJson()),
               getWriteDisposition(),
@@ -1814,7 +1801,7 @@ public class BigQueryIO {
           .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
           .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
               true,
-              bqServices,
+              getBigQueryServices(),
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
@@ -2740,25 +2727,11 @@ public class BigQueryIO {
   * it leverages BigQuery best effort de-dup mechanism.
    */
   private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
-    @Nullable private final transient ValueProvider<TableReference> tableReference;
-    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-    @Nullable private final transient ValueProvider<TableSchema> tableSchema;
-    private final Write.CreateDisposition createDisposition;
-    private final BigQueryServices bqServices;
-    @Nullable private final String tableDescription;
+    private final Write write;
 
     /** Constructor. */
-    StreamWithDeDup(ValueProvider<TableReference> tableReference,
-                    @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-                    @Nullable ValueProvider<TableSchema> tableSchema,
-                    Write.CreateDisposition createDisposition,
-                    @Nullable String tableDescription, BigQueryServices bqServices) {
-      this.tableReference = tableReference;
-      this.tableRefFunction = tableRefFunction;
-      this.tableSchema = tableSchema;
-      this.createDisposition = createDisposition;
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-      this.tableDescription = tableDescription;
+    StreamWithDeDup(Write write) {
+      this.write = write;
     }
 
     @Override
@@ -2780,20 +2753,26 @@ public class BigQueryIO {
 
       PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of(
           new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class),
-              tableReference, tableRefFunction)));
+              write.getTable(), write.getTableRefFunction())));
 
       // To prevent having the same TableRow processed more than once with regenerated
       // different unique ids, this implementation relies on "checkpointing", which is
       // achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
       // performed by Reshuffle.
+      NestedValueProvider<TableSchema, String> schema =
+          write.getJsonSchema() == null
+              ? null
+              : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(
-              tableSchema,
-              createDisposition,
-              tableDescription,
-              bqServices)));
+          .apply(
+              ParDo.of(
+                  new StreamingWriteFn(
+                      schema,
+                      write.getCreateDisposition(),
+                      write.getTableDescription(),
+                      write.getBigQueryServices())));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user


[06/10] beam git commit: Use AutoValue for BigQueryIO.Write

Posted by tg...@apache.org.
Use AutoValue for BigQueryIO.Write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c715896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c715896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c715896

Branch: refs/heads/master
Commit: 5c715896e683f7bec9f150ea17c78d1dae00ee4c
Parents: 32cba32
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:14:39 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:30 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 314 ++++++-------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   8 +-
 2 files changed, 108 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e5db60e..d2f6ba6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -471,9 +471,9 @@ public class BigQueryIO {
       abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
       abstract Builder setQuery(ValueProvider<String> query);
       abstract Builder setValidate(boolean validate);
-      abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
-      abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
-      abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+      abstract Builder setFlattenResults(Boolean flattenResults);
+      abstract Builder setUseLegacySql(Boolean useLegacySql);
+      abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
 
       abstract Read build();
     }
@@ -1364,10 +1364,13 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
+    @VisibleForTesting
     // Maximum number of files in a single partition.
     static final int MAX_NUM_FILES = 10000;
 
+    @VisibleForTesting
     // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
     static final long MAX_SIZE_BYTES = 11 * (1L << 40);
 
@@ -1378,28 +1381,41 @@ public class BigQueryIO {
     // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
     private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-    @Nullable private final ValueProvider<String> jsonTableRef;
-
-    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
-    // Table schema. The schema is required only if the table does not exist.
-    @Nullable private final ValueProvider<String> jsonSchema;
-
-    // Options for creating the table. Valid values are CREATE_IF_NEEDED and
-    // CREATE_NEVER.
-    final CreateDisposition createDisposition;
+    @Nullable abstract ValueProvider<String> getJsonTableRef();
+    @Nullable abstract SerializableFunction<BoundedWindow, TableReference> getTableRefFunction();
+    /** Table schema. The schema is required only if the table does not exist. */
+    @Nullable abstract ValueProvider<String> getJsonSchema();
+    abstract CreateDisposition getCreateDisposition();
+    abstract WriteDisposition getWriteDisposition();
+    @Nullable abstract String getTableDescription();
+    /** An option to indicate if table validation is desired. Default is true. */
+    abstract boolean getValidate();
+    @Nullable abstract BigQueryServices getBigQueryServices();
 
-    // Options for writing to the table. Valid values are WRITE_TRUNCATE,
-    // WRITE_APPEND and WRITE_EMPTY.
-    final WriteDisposition writeDisposition;
+    abstract Builder toBuilder();
 
-    @Nullable
-    final String tableDescription;
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+      abstract Builder setTableRefFunction(
+          SerializableFunction<BoundedWindow, TableReference> tableRefFunction);
+      abstract Builder setJsonSchema(ValueProvider<String> jsonSchema);
+      abstract Builder setCreateDisposition(CreateDisposition createDisposition);
+      abstract Builder setWriteDisposition(WriteDisposition writeDisposition);
+      abstract Builder setTableDescription(String tableDescription);
+      abstract Builder setValidate(boolean validate);
+      abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
 
-    // An option to indicate if table validation is desired. Default is true.
-    final boolean validate;
+      abstract Write build();
+    }
 
-    @Nullable private BigQueryServices bigQueryServices;
+    private static Builder builder() {
+      return new AutoValue_BigQueryIO_Write.Builder()
+          .setValidate(true)
+          .setBigQueryServices(new BigQueryServicesImpl())
+          .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+          .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
+    }
 
     /**
      * An enumeration type for the BigQuery create disposition strings.
@@ -1474,17 +1490,22 @@ public class BigQueryIO {
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
      */
     public static Write to(String tableSpec) {
-      return new Write().withTableSpec(tableSpec);
+      return to(StaticValueProvider.of(tableSpec));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(ValueProvider<String> tableSpec) {
-      return new Write().withTableSpec(tableSpec);
+    public static Write to(TableReference table) {
+      return to(StaticValueProvider.of(toTableSpec(table)));
     }
 
     /** Creates a write transformation for the given table. */
-    public static Write to(TableReference table) {
-      return new Write().withTableRef(table);
+    public static Write to(ValueProvider<String> tableSpec) {
+      return builder()
+          .setJsonTableRef(
+              NestedValueProvider.of(
+                  NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+                  new TableRefToJson()))
+          .build();
     }
 
     /**
@@ -1499,7 +1520,7 @@ public class BigQueryIO {
      * always return the same table specification.
      */
     public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return new Write().withTableSpec(tableSpecFunction);
+      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
     }
 
     /**
@@ -1511,7 +1532,7 @@ public class BigQueryIO {
      */
     private static Write toTableReference(
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Write().withTableRef(tableRefFunction);
+      return builder().setTableRefFunction(tableRefFunction).build();
     }
 
     private static class TranslateTableSpecFunction implements
@@ -1528,109 +1549,6 @@ public class BigQueryIO {
       }
     }
 
-    private Write() {
-      this(
-          null /* name */,
-          null /* jsonTableRef */,
-          null /* tableRefFunction */,
-          null /* jsonSchema */,
-          CreateDisposition.CREATE_IF_NEEDED,
-          WriteDisposition.WRITE_EMPTY,
-          null /* tableDescription */,
-          true /* validate */,
-          null /* bigQueryServices */);
-    }
-
-    private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
-        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        @Nullable ValueProvider<String> jsonSchema,
-        CreateDisposition createDisposition,
-        WriteDisposition writeDisposition,
-        @Nullable String tableDescription,
-        boolean validate,
-        @Nullable BigQueryServices bigQueryServices) {
-      super(name);
-      this.jsonTableRef = jsonTableRef;
-      this.tableRefFunction = tableRefFunction;
-      this.jsonSchema = jsonSchema;
-      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-      this.tableDescription = tableDescription;
-      this.validate = validate;
-      this.bigQueryServices = bigQueryServices;
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table. Refer to
-     * {@link #parseTableSpec(String)} for the specification format.
-     *
-     * <p>Does not modify this object.
-     */
-    private Write withTableSpec(String tableSpec) {
-      return withTableRef(NestedValueProvider.of(
-          StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableRef(TableReference table) {
-      return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table. Refer to
-     * {@link #parseTableSpec(String)} for the specification format.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableSpec(ValueProvider<String> tableSpec) {
-      return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but writing to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    private Write withTableRef(ValueProvider<TableReference> table) {
-      return new Write(name,
-          NestedValueProvider.of(table, new TableRefToJson()),
-          tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, bigQueryServices);
-    }
-
-    /**
-     * Returns a copy of this write transformation, but using the specified function to determine
-     * which table to write to for each window.
-     *
-     * <p>Does not modify this object.
-     *
-     * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
-     * should always return the same table specification.
-     */
-    private Write withTableSpec(
-        SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
-    }
-
-    /**
-     * Returns a copy of this write transformation, but using the specified function to determine
-     * which table to write to for each window.
-     *
-     * <p>Does not modify this object.
-     *
-     * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-     * always return the same table reference.
-     */
-    private Write withTableRef(
-        SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, bigQueryServices);
-    }
-
     /**
      * Returns a copy of this write transformation, but using the specified schema for rows
      * to be written.
@@ -1638,18 +1556,18 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withSchema(TableSchema schema) {
-      return new Write(name, jsonTableRef, tableRefFunction,
-          StaticValueProvider.of(toJsonString(schema)),
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder()
+          .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+          .build();
     }
 
     /**
      * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
      */
     public Write withSchema(ValueProvider<TableSchema> schema) {
-      return new Write(name, jsonTableRef, tableRefFunction,
-          NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder()
+          .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()))
+          .build();
     }
 
     /**
@@ -1658,8 +1576,7 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withCreateDisposition(CreateDisposition createDisposition) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder().setCreateDisposition(createDisposition).build();
     }
 
     /**
@@ -1668,8 +1585,7 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withWriteDisposition(WriteDisposition writeDisposition) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      return toBuilder().setWriteDisposition(writeDisposition).build();
     }
 
     /**
@@ -1677,9 +1593,8 @@ public class BigQueryIO {
      *
      * <p>Does not modify this object.
      */
-    public Write withTableDescription(@Nullable String tableDescription) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
-          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    public Write withTableDescription(String tableDescription) {
+      return toBuilder().setTableDescription(tableDescription).build();
     }
 
     /**
@@ -1688,14 +1603,12 @@ public class BigQueryIO {
      * <p>Does not modify this object.
      */
     public Write withoutValidation() {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, false, bigQueryServices);
+      return toBuilder().setValidate(false).build();
     }
 
     @VisibleForTesting
     Write withTestServices(BigQueryServices testServices) {
-      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-          writeDisposition, tableDescription, validate, testServices);
+      return toBuilder().setBigQueryServices(testServices).build();
     }
 
     private static void verifyTableNotExistOrEmpty(
@@ -1724,23 +1637,25 @@ public class BigQueryIO {
 
       // Exactly one of the table and table reference can be configured.
       checkState(
-          jsonTableRef != null || tableRefFunction != null,
+          getJsonTableRef() != null || getTableRefFunction() != null,
           "must set the table reference of a BigQueryIO.Write transform");
       checkState(
-          jsonTableRef == null || tableRefFunction == null,
+          getJsonTableRef() == null || getTableRefFunction() == null,
           "Cannot set both a table reference and a table function for a BigQueryIO.Write"
               + " transform");
 
       // Require a schema if creating one or more tables.
       checkArgument(
-          createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+          getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null,
           "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
 
       // The user specified a table.
-      if (jsonTableRef != null && validate) {
+      BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
+      if (getJsonTableRef() != null && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
 
-        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+        DatasetService datasetService = bqServices.getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
         // Note that a presence check can fail when the table or dataset is created by an earlier
         // stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1754,22 +1669,22 @@ public class BigQueryIO {
         }
       }
 
-      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
         // We will use BigQuery's streaming write API -- validate supported dispositions.
-        if (tableRefFunction != null) {
+        if (getTableRefFunction() != null) {
           checkArgument(
-              createDisposition != CreateDisposition.CREATE_NEVER,
+              getCreateDisposition() != CreateDisposition.CREATE_NEVER,
               "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
               + " function.");
         }
-        if (jsonSchema == null) {
+        if (getJsonSchema() == null) {
           checkArgument(
-              createDisposition == CreateDisposition.CREATE_NEVER,
+              getCreateDisposition() == CreateDisposition.CREATE_NEVER,
               "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
         }
 
         checkArgument(
-            writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+            getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
             "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
                 + " when using a tablespec function.");
       } else {
@@ -1778,7 +1693,7 @@ public class BigQueryIO {
         checkArgument(
             !Strings.isNullOrEmpty(tempLocation),
             "BigQueryIO.Write needs a GCS temp location to store temp files.");
-        if (bigQueryServices == null) {
+        if (bqServices == null) {
           try {
             GcsPath.fromUri(tempLocation);
           } catch (IllegalArgumentException e) {
@@ -1796,17 +1711,18 @@ public class BigQueryIO {
     public PDone expand(PCollection<TableRow> input) {
       Pipeline p = input.getPipeline();
       BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-      BigQueryServices bqServices = getBigQueryServices();
+      BigQueryServices bqServices =
+          MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
 
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
-      if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+      if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
         return input.apply(
-            new StreamWithDeDup(getTable(), tableRefFunction,
-                jsonSchema == null ? null : NestedValueProvider.of(
-                    jsonSchema, new JsonSchemaToTableSchema()),
-                createDisposition,
-                tableDescription,
+            new StreamWithDeDup(getTable(), getTableRefFunction(),
+                getJsonSchema() == null ? null : NestedValueProvider.of(
+                    getJsonSchema(), new JsonSchemaToTableSchema()),
+                getCreateDisposition(),
+                getTableDescription(),
                 bqServices));
       }
 
@@ -1874,10 +1790,10 @@ public class BigQueryIO {
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
-              jsonSchema,
+              getJsonSchema(),
               WriteDisposition.WRITE_EMPTY,
               CreateDisposition.CREATE_IF_NEEDED,
-              tableDescription))
+              getTableDescription()))
           .withSideInputs(jobIdTokenView));
 
       PCollectionView<Iterable<String>> tempTablesView = tempTables
@@ -1887,10 +1803,10 @@ public class BigQueryIO {
               bqServices,
               jobIdTokenView,
               NestedValueProvider.of(table, new TableRefToJson()),
-              writeDisposition,
-              createDisposition,
+              getWriteDisposition(),
+              getCreateDisposition(),
               tempTablesView,
-              tableDescription))
+              getTableDescription()))
           .withSideInputs(tempTablesView, jobIdTokenView));
 
       // Write single partition to final table
@@ -1902,10 +1818,10 @@ public class BigQueryIO {
               jobIdTokenView,
               tempFilePrefix,
               NestedValueProvider.of(table, new TableRefToJson()),
-              jsonSchema,
-              writeDisposition,
-              createDisposition,
-              tableDescription))
+              getJsonSchema(),
+              getWriteDisposition(),
+              getCreateDisposition(),
+              getTableDescription()))
           .withSideInputs(jobIdTokenView));
 
       return PDone.in(input.getPipeline());
@@ -1969,41 +1885,31 @@ public class BigQueryIO {
       super.populateDisplayData(builder);
 
       builder
-          .addIfNotNull(DisplayData.item("table", jsonTableRef)
+          .addIfNotNull(DisplayData.item("table", getJsonTableRef())
             .withLabel("Table Reference"))
-          .addIfNotNull(DisplayData.item("schema", jsonSchema)
+          .addIfNotNull(DisplayData.item("schema", getJsonSchema())
             .withLabel("Table Schema"));
 
-      if (tableRefFunction != null) {
-        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+      if (getTableRefFunction() != null) {
+        builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
           .withLabel("Table Reference Function"));
       }
 
       builder
-          .add(DisplayData.item("createDisposition", createDisposition.toString())
+          .add(DisplayData.item("createDisposition", getCreateDisposition().toString())
             .withLabel("Table CreateDisposition"))
-          .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+          .add(DisplayData.item("writeDisposition", getWriteDisposition().toString())
             .withLabel("Table WriteDisposition"))
-          .addIfNotDefault(DisplayData.item("validation", validate)
+          .addIfNotDefault(DisplayData.item("validation", getValidate())
             .withLabel("Validation Enabled"), true)
-          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+          .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
             .withLabel("Table Description"));
     }
 
-    /** Returns the create disposition. */
-    public CreateDisposition getCreateDisposition() {
-      return createDisposition;
-    }
-
-    /** Returns the write disposition. */
-    public WriteDisposition getWriteDisposition() {
-      return writeDisposition;
-    }
-
     /** Returns the table schema. */
     public TableSchema getSchema() {
       return fromJsonString(
-          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+          getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
     }
 
     /**
@@ -2036,20 +1942,8 @@ public class BigQueryIO {
     /** Returns the table reference, or {@code null}. */
     @Nullable
     public ValueProvider<TableReference> getTable() {
-      return jsonTableRef == null ? null :
-          NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
-    }
-
-    /** Returns {@code true} if table validation is enabled. */
-    public boolean getValidate() {
-      return validate;
-    }
-
-    private BigQueryServices getBigQueryServices() {
-      if (bigQueryServices == null) {
-        bigQueryServices = new BigQueryServicesImpl();
-      }
-      return bigQueryServices;
+      return getJsonTableRef() == null ? null :
+          NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
     }
 
     static class TableRowWriter {

http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fdaa81c..888d9c1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -712,10 +712,10 @@ public class BigQueryIOTest implements Serializable {
     assertEquals(dataset, write.getTable().get().getDatasetId());
     assertEquals(table, write.getTable().get().getTableId());
     assertEquals(schema, write.getSchema());
-    assertEquals(createDisposition, write.createDisposition);
-    assertEquals(writeDisposition, write.writeDisposition);
-    assertEquals(tableDescription, write.tableDescription);
-    assertEquals(validate, write.validate);
+    assertEquals(createDisposition, write.getCreateDisposition());
+    assertEquals(writeDisposition, write.getWriteDisposition());
+    assertEquals(tableDescription, write.getTableDescription());
+    assertEquals(validate, write.getValidate());
   }
 
   @Before