You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:55 UTC
[01/10] beam git commit: Remove two unused fields
Repository: beam
Updated Branches:
refs/heads/master cc12fd378 -> 806c53c18
Remove two unused fields
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30f36344
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30f36344
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30f36344
Branch: refs/heads/master
Commit: 30f363444679293727171f2417c9d991a4bf7852
Parents: cc12fd3
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:54:55 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:19 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/30f36344/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 0e1c6fc..2902c2b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -130,7 +130,6 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1189,15 +1188,9 @@ public class BigQueryIO {
* ...
*/
private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
- // The maximum number of retries to verify temp files.
- private static final int MAX_FILES_VERIFY_RETRIES = 9;
-
// The maximum number of retries to poll a BigQuery job.
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
- // The initial backoff for verifying temp files.
- private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds(1);
-
protected final ValueProvider<String> jobIdToken;
protected final String extractDestinationDir;
protected final BigQueryServices bqServices;
[08/10] beam git commit: Replace BigQueryIO.Read.from() with
BigQueryIO.read().from()
Posted by tg...@apache.org.
Replace BigQueryIO.Read.from() with BigQueryIO.read().from()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a252a77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a252a77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a252a77
Branch: refs/heads/master
Commit: 1a252a771127febe551fda5d499c7ecb3b95cf23
Parents: 1adcbae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:15:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:36 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/cookbook/FilterExamples.java | 2 +-
.../beam/examples/cookbook/JoinExamples.java | 4 +--
.../examples/cookbook/MaxPerKeyExamples.java | 2 +-
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++++++-------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++----------
8 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 079674a..d3c9167 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -156,7 +156,7 @@ public class BigQueryTornadoes {
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new CountTornadoes())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 37f9d79..fc54b13 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -200,7 +200,7 @@ public class CombinePerKeyExamples {
fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new PlaysForWord())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index fb6b507..714a8f2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -238,7 +238,7 @@ public class FilterExamples {
TableSchema schema = buildWeatherSchemaProjection();
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(ParDo.of(new ProjectionFn()))
.apply(new BelowGlobalMean(options.getMonthFilter()))
.apply(BigQueryIO.Write
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 7cf0942..05a3ad3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -166,8 +166,8 @@ public class JoinExamples {
Pipeline p = Pipeline.create(options);
// the following two 'applys' create multiple inputs to our pipeline, one for each
// of our two input sources.
- PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
- PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+ PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
+ PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
formattedResults.apply(TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index eabc42b..7e7bc72 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -149,7 +149,7 @@ public class MaxPerKeyExamples {
fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new MaxMeanTemp())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c4ff158..c65d7dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -24,7 +24,7 @@
* from existing storage:
* <pre>{@code
* PCollection<TableRow> inputData = pipeline.apply(
- * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
* and {@code Write} transforms that persist PCollections to external storage:
* <pre> {@code
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e039c8c..dfb7ea6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
* This produces a {@link PCollection} of {@link TableRow TableRows} as output:
* <pre>{@code
* PCollection<TableRow> weatherData = pipeline.apply(
- * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
*
* <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -177,7 +177,7 @@ import org.slf4j.LoggerFactory;
*
* <pre>{@code
* PCollection<TableRow> meanTemperatureData = pipeline.apply(
- * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
+ * BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
* }</pre>
*
* <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -454,6 +454,14 @@ public class BigQueryIO {
* }
* }}</pre>
*/
+ public static Read read() {
+ return new AutoValue_BigQueryIO_Read.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .build();
+ }
+
+ /** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
@Nullable abstract ValueProvider<String> getJsonTableRef();
@@ -477,25 +485,26 @@ public class BigQueryIO {
abstract Read build();
}
- private static Builder builder() {
- return new AutoValue_BigQueryIO_Read.Builder()
- .setValidate(true)
- .setBigQueryServices(new BigQueryServicesImpl());
+ /** Ensures that methods of the from() / fromQuery() family are called at most once. */
+ private void ensureFromNotCalledYet() {
+ checkState(
+ getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");
}
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"} for tables within the current project.
*/
- public static Read from(String tableSpec) {
+ public Read from(String tableSpec) {
return from(StaticValueProvider.of(tableSpec));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Read from(ValueProvider<String> tableSpec) {
- return builder()
+ public Read from(ValueProvider<String> tableSpec) {
+ ensureFromNotCalledYet();
+ return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -505,21 +514,22 @@ public class BigQueryIO {
/**
* Reads results received after executing the given query.
*/
- public static Read fromQuery(String query) {
+ public Read fromQuery(String query) {
return fromQuery(StaticValueProvider.of(query));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Read fromQuery(ValueProvider<String> query) {
- return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+ public Read fromQuery(ValueProvider<String> query) {
+ ensureFromNotCalledYet();
+ return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
}
/**
* Reads a BigQuery table specified as a {@link TableReference} object.
*/
- public static Read from(TableReference table) {
+ public Read from(TableReference table) {
return from(StaticValueProvider.of(toTableSpec(table)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 888d9c1..f6a7fb4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -727,13 +727,13 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildTableBasedSource() {
- BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+ BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable");
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedSource() {
- BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+ BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query");
checkReadQueryObject(read, "foo_query");
}
@@ -742,7 +742,7 @@ public class BigQueryIOTest implements Serializable {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read =
- BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
+ BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation();
checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
}
@@ -751,14 +751,14 @@ public class BigQueryIOTest implements Serializable {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read =
- BigQueryIO.Read.fromQuery("some_query").withoutValidation();
+ BigQueryIO.read().fromQuery("some_query").withoutValidation();
checkReadQueryObjectWithValidate(read, "some_query", false);
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
BigQueryIO.Read read =
- BigQueryIO.Read.from("somedataset.sometable");
+ BigQueryIO.read().from("somedataset.sometable");
checkReadTableObject(read, null, "somedataset", "sometable");
}
@@ -768,7 +768,7 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Read read = BigQueryIO.Read.from(table);
+ BigQueryIO.Read read = BigQueryIO.read().from(table);
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@@ -800,7 +800,7 @@ public class BigQueryIOTest implements Serializable {
thrown.expect(RuntimeException.class);
// Message will be one of following depending on the execution environment.
thrown.expectMessage(Matchers.containsString("Unsupported"));
- p.apply(BigQueryIO.Read.from(tableRef)
+ p.apply(BigQueryIO.read().from(tableRef)
.withTestServices(fakeBqServices));
}
@@ -817,7 +817,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply("ReadMyTable",
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.withoutResultFlattening());
p.run();
@@ -836,7 +836,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply(
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.withoutValidation()
.withoutResultFlattening());
@@ -856,7 +856,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
+ " which only applies to queries");
p.apply(
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.usingStandardSql());
p.run();
@@ -929,7 +929,7 @@ public class BigQueryIOTest implements Serializable {
Pipeline p = TestPipeline.create(bqOptions);
PCollection<String> output = p
- .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
+ .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
.apply(ParDo.of(new DoFn<TableRow, String>() {
@@ -1260,7 +1260,7 @@ public class BigQueryIOTest implements Serializable {
public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.from(tableSpec)
.withoutResultFlattening()
.usingStandardSql()
@@ -1276,7 +1276,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSourceDisplayDataQuery() {
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.fromQuery("myQuery")
.withoutResultFlattening()
.usingStandardSql()
@@ -1295,7 +1295,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.from("project:dataset.tableId")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -1312,7 +1312,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.fromQuery("foobar")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBigQueryIOGetName() {
- assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
+ assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
}
@@ -2317,7 +2317,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read read = BigQueryIO.Read.from(
+ BigQueryIO.Read read = BigQueryIO.read().from(
options.getInputTable()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.
@@ -2330,7 +2330,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
+ BigQueryIO.Read read = BigQueryIO.read().fromQuery(
options.getInputQuery()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.
[04/10] beam git commit: Makes NameUtils recognize AutoValue classes
Posted by tg...@apache.org.
Makes NameUtils recognize AutoValue classes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6b3e112
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6b3e112
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6b3e112
Branch: refs/heads/master
Commit: d6b3e112870b095d96f6f8d6bc165b88eb735952
Parents: 7d1f440
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 20:05:01 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/util/NameUtils.java | 5 +++++
.../java/org/apache/beam/sdk/util/NameUtilsTest.java | 12 ++++++++++++
2 files changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d6b3e112/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
index c67ccca..3f3054a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java
@@ -151,6 +151,8 @@ public class NameUtils {
* <ul>
* <li>1. It keeps the outer classes names.
* <li>2. It removes the common transform inner class: "Bound".
+ * <li>3. For classes generated by AutoValue, whose names start with AutoValue_,
+ * it delegates to the (parent) class declared in the user's source code.
* </ul>
*
* <p>Examples:
@@ -161,6 +163,9 @@ public class NameUtils {
*/
public static String approximatePTransformName(Class<?> clazz) {
checkArgument(PTransform.class.isAssignableFrom(clazz));
+ if (clazz.getSimpleName().startsWith("AutoValue_")) {
+ return approximatePTransformName(clazz.getSuperclass());
+ }
return approximateSimpleName(clazz, /* dropOuterClassNames */ false)
.replaceFirst("\\.Bound$", "");
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d6b3e112/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index 6848ea4..c685a63 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import static org.junit.Assert.assertEquals;
+import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils.NameOverride;
@@ -133,9 +134,20 @@ public class NameUtilsTest {
assertEquals(
"NameUtilsTest.EmbeddedPTransform",
NameUtils.approximatePTransformName(transform.getBound().getClass()));
+ assertEquals(
+ "NameUtilsTest.SomeTransform",
+ NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
}
+ @AutoValue
+ abstract static class SomeTransform extends PTransform<PBegin, PDone> {
+ @Override
+ public PDone expand(PBegin input) {
+ return null;
+ }
+ }
+
@Test
public void testPTransformNameWithAnonOuterClass() throws Exception {
AnonymousClass anonymousClassObj = new AnonymousClass() {
[10/10] beam git commit: This closes #2149
Posted by tg...@apache.org.
This closes #2149
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/806c53c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/806c53c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/806c53c1
Branch: refs/heads/master
Commit: 806c53c18793b43e7ccd28f99662ee73a97237b4
Parents: cc12fd3 101715a
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 15:54:42 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:42 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/StreamingWordExtract.java | 2 +-
.../examples/complete/TrafficMaxLaneFlow.java | 2 +-
.../beam/examples/complete/TrafficRoutes.java | 2 +-
.../examples/cookbook/BigQueryTornadoes.java | 4 +-
.../cookbook/CombinePerKeyExamples.java | 4 +-
.../beam/examples/cookbook/FilterExamples.java | 4 +-
.../beam/examples/cookbook/JoinExamples.java | 4 +-
.../examples/cookbook/MaxPerKeyExamples.java | 4 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../org/apache/beam/sdk/util/NameUtils.java | 5 +
.../org/apache/beam/sdk/util/NameUtilsTest.java | 12 +
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1707 ++++++++----------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 288 ++-
17 files changed, 851 insertions(+), 1197 deletions(-)
----------------------------------------------------------------------
[02/10] beam git commit: Condense BigQueryIO.Read.Bound into
BigQueryIO.Read
Posted by tg...@apache.org.
Condense BigQueryIO.Read.Bound into BigQueryIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/825338aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/825338aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/825338aa
Branch: refs/heads/master
Commit: 825338aaa5d7e5ead1afa13f63c65fb316e1aa6a
Parents: 30f3634
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:15:47 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:22 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 690 +++++++++----------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 117 ++--
2 files changed, 359 insertions(+), 448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2902c2b..f6c8575 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -39,7 +39,6 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -454,448 +453,379 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Read {
+ public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+ @Nullable final ValueProvider<String> jsonTableRef;
+ @Nullable final ValueProvider<String> query;
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"} for tables within the current project.
*/
- public static Bound from(String tableSpec) {
- return new Bound().from(StaticValueProvider.of(tableSpec));
+ public static Read from(String tableSpec) {
+ return new Read().from(StaticValueProvider.of(tableSpec));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Bound from(ValueProvider<String> tableSpec) {
- return new Bound().from(tableSpec);
+ public static Read from(ValueProvider<String> tableSpec) {
+ return new Read().from(tableSpec);
}
/**
* Reads results received after executing the given query.
*/
- public static Bound fromQuery(String query) {
- return new Bound().fromQuery(StaticValueProvider.of(query));
+ public static Read fromQuery(String query) {
+ return new Read().fromQuery(StaticValueProvider.of(query));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Bound fromQuery(ValueProvider<String> query) {
- return new Bound().fromQuery(query);
+ public static Read fromQuery(ValueProvider<String> query) {
+ return new Read().fromQuery(query);
}
/**
* Reads a BigQuery table specified as a {@link TableReference} object.
*/
- public static Bound from(TableReference table) {
- return new Bound().from(table);
+ public static Read from(TableReference table) {
+ return new Read().from(table);
}
/**
- * Disables BigQuery table validation, which is enabled by default.
+ * Disable validation that the table exists or the query succeeds prior to pipeline
+ * submission. Basic validation (such as ensuring that a query or table is specified) still
+ * occurs.
*/
- public static Bound withoutValidation() {
- return new Bound().withoutValidation();
+ final boolean validate;
+ @Nullable final Boolean flattenResults;
+ @Nullable final Boolean useLegacySql;
+ @Nullable BigQueryServices bigQueryServices;
+
+ @VisibleForTesting @Nullable String stepUuid;
+ @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
+ private static final String QUERY_VALIDATION_FAILURE_ERROR =
+ "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ + " pipeline, This validation can be disabled using #withoutValidation.";
+
+ private Read() {
+ this(
+ null /* name */,
+ null /* query */,
+ null /* jsonTableRef */,
+ true /* validate */,
+ null /* flattenResults */,
+ null /* useLegacySql */,
+ null /* bigQueryServices */);
+ }
+
+ private Read(
+ String name, @Nullable ValueProvider<String> query,
+ @Nullable ValueProvider<String> jsonTableRef, boolean validate,
+ @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
+ @Nullable BigQueryServices bigQueryServices) {
+ super(name);
+ this.jsonTableRef = jsonTableRef;
+ this.query = query;
+ this.validate = validate;
+ this.flattenResults = flattenResults;
+ this.useLegacySql = useLegacySql;
+ this.bigQueryServices = bigQueryServices;
}
/**
- * A {@link PTransform} that reads from a BigQuery table and returns a bounded
- * {@link PCollection} of {@link TableRow TableRows}.
+ * Disable validation that the table exists or the query succeeds prior to pipeline
+ * submission. Basic validation (such as ensuring that a query or table is specified) still
+ * occurs.
*/
- public static class Bound extends PTransform<PBegin, PCollection<TableRow>> {
- @Nullable final ValueProvider<String> jsonTableRef;
- @Nullable final ValueProvider<String> query;
-
- /**
- * Disable validation that the table exists or the query succeeds prior to pipeline
- * submission. Basic validation (such as ensuring that a query or table is specified) still
- * occurs.
- */
- final boolean validate;
- @Nullable final Boolean flattenResults;
- @Nullable final Boolean useLegacySql;
- @Nullable BigQueryServices bigQueryServices;
-
- @VisibleForTesting @Nullable String stepUuid;
- @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
- private static final String QUERY_VALIDATION_FAILURE_ERROR =
- "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
- + " pipeline, This validation can be disabled using #withoutValidation.";
-
- private Bound() {
- this(
- null /* name */,
- null /* query */,
- null /* jsonTableRef */,
- true /* validate */,
- null /* flattenResults */,
- null /* useLegacySql */,
- null /* bigQueryServices */);
- }
-
- private Bound(
- String name, @Nullable ValueProvider<String> query,
- @Nullable ValueProvider<String> jsonTableRef, boolean validate,
- @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.query = query;
- this.validate = validate;
- this.flattenResults = flattenResults;
- this.useLegacySql = useLegacySql;
- this.bigQueryServices = bigQueryServices;
- }
-
- /**
- * Returns a copy of this transform that reads from the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Bound from(ValueProvider<String> tableSpec) {
- return new Bound(
- name, query,
- NestedValueProvider.of(
- NestedValueProvider.of(
- tableSpec, new TableSpecToTableRef()),
- new TableRefToJson()),
- validate, flattenResults, useLegacySql, bigQueryServices);
- }
-
- /**
- * Returns a copy of this transform that reads from the specified table.
- *
- * <p>Does not modify this object.
- */
- public Bound from(TableReference table) {
- return from(StaticValueProvider.of(toTableSpec(table)));
- }
-
- /**
- * Returns a copy of this transform that reads the results of the specified query.
- *
- * <p>Does not modify this object.
- *
- * <p>By default, the query results will be flattened -- see
- * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
- * Jobs documentation</a> for more information. To disable flattening, use
- * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
- *
- * <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery
- * Standard SQL dialect, use {@link BigQueryIO.Read.Bound#usingStandardSql}.
- */
- public Bound fromQuery(String query) {
- return fromQuery(StaticValueProvider.of(query));
- }
-
- /**
- * Like {@link #fromQuery(String)}, but from a {@link ValueProvider}.
- */
- public Bound fromQuery(ValueProvider<String> query) {
- return new Bound(name, query, jsonTableRef, validate,
- MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
- MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE),
- bigQueryServices);
- }
+ public Read withoutValidation() {
+ return new Read(
+ name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
+ bigQueryServices);
+ }
- /**
- * Disable validation that the table exists or the query succeeds prior to pipeline
- * submission. Basic validation (such as ensuring that a query or table is specified) still
- * occurs.
- */
- public Bound withoutValidation() {
- return new Bound(
- name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
- bigQueryServices);
- }
+ /**
+ * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
+ * flattening of query results</a>.
+ *
+ * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
+ * from a table will cause an error during validation.
+ */
+ public Read withoutResultFlattening() {
+ return new Read(
+ name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
+ bigQueryServices);
+ }
- /**
- * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
- * flattening of query results</a>.
- *
- * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
- * from a table will cause an error during validation.
- */
- public Bound withoutResultFlattening() {
- return new Bound(
- name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
- bigQueryServices);
- }
+ /**
+ * Enables BigQuery's Standard SQL dialect when reading from a query.
+ *
+ * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
+ * from a table will cause an error during validation.
+ */
+ public Read usingStandardSql() {
+ return new Read(
+ name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
+ bigQueryServices);
+ }
- /**
- * Enables BigQuery's Standard SQL dialect when reading from a query.
- *
- * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
- * from a table will cause an error during validation.
- */
- public Bound usingStandardSql() {
- return new Bound(
- name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
- bigQueryServices);
- }
+ @VisibleForTesting
+ Read withTestServices(BigQueryServices testServices) {
+ return new Read(
+ name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+ }
- @VisibleForTesting
- Bound withTestServices(BigQueryServices testServices) {
- return new Bound(
- name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+ @Override
+ public void validate(PBegin input) {
+ // Even if existence validation is disabled, we need to make sure that the BigQueryIO
+ // read is properly specified.
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ String tempLocation = bqOptions.getTempLocation();
+ checkArgument(
+ !Strings.isNullOrEmpty(tempLocation),
+ "BigQueryIO.Read needs a GCS temp location to store temp files.");
+ if (bigQueryServices == null) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+ tempLocation),
+ e);
+ }
}
- @Override
- public void validate(PBegin input) {
- // Even if existence validation is disabled, we need to make sure that the BigQueryIO
- // read is properly specified.
- BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
- String tempLocation = bqOptions.getTempLocation();
- checkArgument(
- !Strings.isNullOrEmpty(tempLocation),
- "BigQueryIO.Read needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
- try {
- GcsPath.fromUri(tempLocation);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
- tempLocation),
- e);
- }
- }
-
- ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
+ checkState(
+ table == null || query == null,
+ "Invalid BigQueryIO.Read: table reference and query may not both be set");
+ checkState(
+ table != null || query != null,
+ "Invalid BigQueryIO.Read: one of table reference and query must be set");
+ if (table != null) {
checkState(
- table == null || query == null,
- "Invalid BigQueryIO.Read: table reference and query may not both be set");
+ flattenResults == null,
+ "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+ + " preference, which only applies to queries");
checkState(
- table != null || query != null,
- "Invalid BigQueryIO.Read: one of table reference and query must be set");
-
- if (table != null) {
- checkState(
- flattenResults == null,
- "Invalid BigQueryIO.Read: Specifies a table with a result flattening"
- + " preference, which only applies to queries");
- checkState(
- useLegacySql == null,
- "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
- + " preference, which only applies to queries");
- } else /* query != null */ {
- checkState(flattenResults != null, "flattenResults should not be null if query is set");
- checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
- }
-
- // Note that a table or query check can fail if the table or dataset are created by
- // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
- // For these cases the withoutValidation method can be used to disable the check.
- if (validate && table != null) {
- checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
- // Check for source table presence for early failure notification.
- DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
- verifyDatasetPresence(datasetService, table.get());
- verifyTablePresence(datasetService, table.get());
- } else if (validate && query != null) {
- checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
- JobService jobService = getBigQueryServices().getJobService(bqOptions);
- try {
- jobService.dryRunQuery(
- bqOptions.getProject(),
- new JobConfigurationQuery()
- .setQuery(query.get())
- .setFlattenResults(flattenResults)
- .setUseLegacySql(useLegacySql));
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
- }
- }
- }
-
- @Override
- public PCollection<TableRow> expand(PBegin input) {
- stepUuid = randomUUIDString();
- BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
- jobUuid = NestedValueProvider.of(
- StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
- final ValueProvider<String> jobIdToken = NestedValueProvider.of(
- jobUuid, new BeamJobUuidToBigQueryJobUuid());
-
- BoundedSource<TableRow> source;
- final BigQueryServices bqServices = getBigQueryServices();
-
- final String extractDestinationDir;
- String tempLocation = bqOptions.getTempLocation();
+ useLegacySql == null,
+ "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ + " preference, which only applies to queries");
+ } else /* query != null */ {
+ checkState(flattenResults != null, "flattenResults should not be null if query is set");
+ checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+ }
+
+ // Note that a table or query check can fail if the table or dataset are created by
+ // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
+ // For these cases the withoutValidation method can be used to disable the check.
+ if (validate && table != null) {
+ checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
+ // Check for source table presence for early failure notification.
+ DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
+ verifyDatasetPresence(datasetService, table.get());
+ verifyTablePresence(datasetService, table.get());
+ } else if (validate && query != null) {
+ checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+ JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- extractDestinationDir = factory.resolve(tempLocation, stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve extract destination directory in %s", tempLocation));
- }
-
- final String executingProject = bqOptions.getProject();
- if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
- source = BigQueryQuerySource.create(
- jobIdToken, query, NestedValueProvider.of(
- jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
- flattenResults, useLegacySql, extractDestinationDir, bqServices);
- } else {
- ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
- source = BigQueryTableSource.create(
- jobIdToken, inputTable, extractDestinationDir, bqServices,
- StaticValueProvider.of(executingProject));
+ jobService.dryRunQuery(
+ bqOptions.getProject(),
+ new JobConfigurationQuery()
+ .setQuery(query.get())
+ .setFlattenResults(flattenResults)
+ .setUseLegacySql(useLegacySql));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
}
- PassThroughThenCleanup.CleanupOperation cleanupOperation =
- new PassThroughThenCleanup.CleanupOperation() {
- @Override
- void cleanup(PipelineOptions options) throws Exception {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject)
- .setJobId(getExtractJobId(jobIdToken));
-
- Job extractJob = bqServices.getJobService(bqOptions)
- .getJob(jobRef);
-
- Collection<String> extractFiles = null;
- if (extractJob != null) {
- extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
- } else {
- IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
- Collection<String> dirMatch = factory.match(extractDestinationDir);
- if (!dirMatch.isEmpty()) {
- extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
- }
- }
- if (extractFiles != null && !extractFiles.isEmpty()) {
- new GcsUtilFactory().create(options).remove(extractFiles);
- }
- }};
- return input.getPipeline()
- .apply(org.apache.beam.sdk.io.Read.from(source))
- .setCoder(getDefaultOutputCoder())
- .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
}
+ }
- @Override
- protected Coder<TableRow> getDefaultOutputCoder() {
- return TableRowJsonCoder.of();
+ @Override
+ public PCollection<TableRow> expand(PBegin input) {
+ stepUuid = randomUUIDString();
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ jobUuid = NestedValueProvider.of(
+ StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
+ final ValueProvider<String> jobIdToken = NestedValueProvider.of(
+ jobUuid, new BeamJobUuidToBigQueryJobUuid());
+
+ BoundedSource<TableRow> source;
+ final BigQueryServices bqServices = getBigQueryServices();
+
+ final String extractDestinationDir;
+ String tempLocation = bqOptions.getTempLocation();
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ extractDestinationDir = factory.resolve(tempLocation, stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve extract destination directory in %s", tempLocation));
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
- .withLabel("Table"))
- .addIfNotNull(DisplayData.item("query", query)
- .withLabel("Query"))
- .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
- .withLabel("Flatten Query Results"))
- .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
- .withLabel("Use Legacy SQL Dialect"))
- .addIfNotDefault(DisplayData.item("validation", validate)
- .withLabel("Validation Enabled"),
- true);
+ final String executingProject = bqOptions.getProject();
+ if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
+ source = BigQueryQuerySource.create(
+ jobIdToken, query, NestedValueProvider.of(
+ jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+ flattenResults, useLegacySql, extractDestinationDir, bqServices);
+ } else {
+ ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
+ source = BigQueryTableSource.create(
+ jobIdToken, inputTable, extractDestinationDir, bqServices,
+ StaticValueProvider.of(executingProject));
}
+ PassThroughThenCleanup.CleanupOperation cleanupOperation =
+ new PassThroughThenCleanup.CleanupOperation() {
+ @Override
+ void cleanup(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(getExtractJobId(jobIdToken));
+
+ Job extractJob = bqServices.getJobService(bqOptions)
+ .getJob(jobRef);
+
+ Collection<String> extractFiles = null;
+ if (extractJob != null) {
+ extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+ } else {
+ IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ Collection<String> dirMatch = factory.match(extractDestinationDir);
+ if (!dirMatch.isEmpty()) {
+ extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+ }
+ }
+ if (extractFiles != null && !extractFiles.isEmpty()) {
+ new GcsUtilFactory().create(options).remove(extractFiles);
+ }
+ }};
+ return input.getPipeline()
+ .apply(org.apache.beam.sdk.io.Read.from(source))
+ .setCoder(getDefaultOutputCoder())
+ .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
+ }
- /**
- * Returns the table to read, or {@code null} if reading from a query instead.
- *
- * <p>If the table's project is not specified, use the executing project.
- */
- @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
- BigQueryOptions bqOptions) {
- ValueProvider<TableReference> table = getTableProvider();
- if (table == null) {
- return table;
- }
- if (!table.isAccessible()) {
- LOG.info("Using a dynamic value for table input. This must contain a project"
- + " in the table reference: {}", table);
- return table;
- }
- if (Strings.isNullOrEmpty(table.get().getProjectId())) {
- // If user does not specify a project we assume the table to be located in
- // the default project.
- TableReference tableRef = table.get();
- tableRef.setProjectId(bqOptions.getProject());
- return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(tableRef)), new JsonTableRefToTableRef());
- }
+ @Override
+ protected Coder<TableRow> getDefaultOutputCoder() {
+ return TableRowJsonCoder.of();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+ .withLabel("Table"))
+ .addIfNotNull(DisplayData.item("query", query)
+ .withLabel("Query"))
+ .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+ .withLabel("Flatten Query Results"))
+ .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+ .withLabel("Use Legacy SQL Dialect"))
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"),
+ true);
+ }
+
+ /**
+ * Returns the table to read, or {@code null} if reading from a query instead.
+ *
+ * <p>If the table's project is not specified, use the executing project.
+ */
+ @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ BigQueryOptions bqOptions) {
+ ValueProvider<TableReference> table = getTableProvider();
+ if (table == null) {
return table;
}
-
- /**
- * Returns the table to read, or {@code null} if reading from a query instead.
- */
- @Nullable
- public ValueProvider<TableReference> getTableProvider() {
- return jsonTableRef == null
- ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ if (!table.isAccessible()) {
+ LOG.info("Using a dynamic value for table input. This must contain a project"
+ + " in the table reference: {}", table);
+ return table;
}
- /**
- * Returns the table to read, or {@code null} if reading from a query instead.
- */
- @Nullable
- public TableReference getTable() {
- ValueProvider<TableReference> provider = getTableProvider();
- return provider == null ? null : provider.get();
+ if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ TableReference tableRef = table.get();
+ tableRef.setProjectId(bqOptions.getProject());
+ return NestedValueProvider.of(StaticValueProvider.of(
+ toJsonString(tableRef)), new JsonTableRefToTableRef());
}
+ return table;
+ }
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public String getQuery() {
- return query == null ? null : query.get();
- }
+ /**
+ * Returns the table to read, or {@code null} if reading from a query instead.
+ */
+ @Nullable
+ public ValueProvider<TableReference> getTableProvider() {
+ return jsonTableRef == null
+ ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ }
+ /**
+ * Returns the table to read, or {@code null} if reading from a query instead.
+ */
+ @Nullable
+ public TableReference getTable() {
+ ValueProvider<TableReference> provider = getTableProvider();
+ return provider == null ? null : provider.get();
+ }
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public ValueProvider<String> getQueryProvider() {
- return query;
- }
+ /**
+ * Returns the query to be read, or {@code null} if reading from a table instead.
+ */
+ @Nullable
+ public String getQuery() {
+ return query == null ? null : query.get();
+ }
- /**
- * Returns true if table validation is enabled.
- */
- public boolean getValidate() {
- return validate;
- }
+ /**
+ * Returns the query to be read, or {@code null} if reading from a table instead.
+ */
+ @Nullable
+ public ValueProvider<String> getQueryProvider() {
+ return query;
+ }
- /**
- * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
- */
- public Boolean getFlattenResults() {
- return flattenResults;
- }
+ /**
+ * Returns true if table validation is enabled.
+ */
+ public boolean getValidate() {
+ return validate;
+ }
- /**
- * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
- * if not applicable.
- */
- @Nullable
- public Boolean getUseLegacySql() {
- return useLegacySql;
- }
+ /**
+ * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
+ */
+ public Boolean getFlattenResults() {
+ return flattenResults;
+ }
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
- }
+ /**
+ * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
+ * if not applicable.
+ */
+ @Nullable
+ public Boolean getUseLegacySql() {
+ return useLegacySql;
}
- /** Disallow construction of utility class. */
- private Read() {}
+ private BigQueryServices getBigQueryServices() {
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
+ }
+ return bigQueryServices;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index c9061a3..bb1528b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -665,28 +665,28 @@ public class BigQueryIOTest implements Serializable {
@Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
private void checkReadTableObject(
- BigQueryIO.Read.Bound bound, String project, String dataset, String table) {
- checkReadTableObjectWithValidate(bound, project, dataset, table, true);
+ BigQueryIO.Read read, String project, String dataset, String table) {
+ checkReadTableObjectWithValidate(read, project, dataset, table, true);
}
- private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
- checkReadQueryObjectWithValidate(bound, query, true);
+ private void checkReadQueryObject(BigQueryIO.Read read, String query) {
+ checkReadQueryObjectWithValidate(read, query, true);
}
private void checkReadTableObjectWithValidate(
- BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) {
- assertEquals(project, bound.getTable().getProjectId());
- assertEquals(dataset, bound.getTable().getDatasetId());
- assertEquals(table, bound.getTable().getTableId());
- assertNull(bound.query);
- assertEquals(validate, bound.getValidate());
+ BigQueryIO.Read read, String project, String dataset, String table, boolean validate) {
+ assertEquals(project, read.getTable().getProjectId());
+ assertEquals(dataset, read.getTable().getDatasetId());
+ assertEquals(table, read.getTable().getTableId());
+ assertNull(read.query);
+ assertEquals(validate, read.getValidate());
}
private void checkReadQueryObjectWithValidate(
- BigQueryIO.Read.Bound bound, String query, boolean validate) {
- assertNull(bound.getTable());
- assertEquals(query, bound.getQuery());
- assertEquals(validate, bound.getValidate());
+ BigQueryIO.Read read, String query, boolean validate) {
+ assertNull(read.getTable());
+ assertEquals(query, read.getQuery());
+ assertEquals(validate, read.getValidate());
}
private void checkWriteObject(
@@ -728,39 +728,39 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildTableBasedSource() {
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
- checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+ BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+ checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedSource() {
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
- checkReadQueryObject(bound, "foo_query");
+ BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+ checkReadQueryObject(read, "foo_query");
}
@Test
public void testBuildTableBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read read =
BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
- checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
+ checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
}
@Test
public void testBuildQueryBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read read =
BigQueryIO.Read.fromQuery("some_query").withoutValidation();
- checkReadQueryObjectWithValidate(bound, "some_query", false);
+ checkReadQueryObjectWithValidate(read, "some_query", false);
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
- BigQueryIO.Read.Bound bound =
+ BigQueryIO.Read read =
BigQueryIO.Read.from("somedataset.sometable");
- checkReadTableObject(bound, null, "somedataset", "sometable");
+ checkReadTableObject(read, null, "somedataset", "sometable");
}
@Test
@@ -769,8 +769,8 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
- checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+ BigQueryIO.Read read = BigQueryIO.Read.from(table);
+ checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
@@ -807,39 +807,6 @@ public class BigQueryIOTest implements Serializable {
@Test
@Category(NeedsRunner.class)
- public void testBuildSourceWithoutTableQueryOrValidation() {
- BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
- bqOptions.setProject("defaultProject");
- bqOptions.setTempLocation("gs://testbucket/testdir");
-
- Pipeline p = TestPipeline.create(bqOptions);
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "Invalid BigQueryIO.Read: one of table reference and query must be set");
- p.apply(BigQueryIO.Read.withoutValidation());
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testBuildSourceWithTableAndQuery() {
- BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
- bqOptions.setProject("defaultProject");
- bqOptions.setTempLocation("gs://testbucket/testdir");
-
- Pipeline p = TestPipeline.create(bqOptions);
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage(
- "Invalid BigQueryIO.Read: table reference and query may not both be set");
- p.apply("ReadMyTable",
- BigQueryIO.Read
- .from("foo.com:project:somedataset.sometable")
- .fromQuery("query"));
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
public void testBuildSourceWithTableAndFlatten() {
BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
bqOptions.setProject("defaultProject");
@@ -1291,12 +1258,11 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testBuildSourceDisplayData() {
+ public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
- BigQueryIO.Read.Bound read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.Read
.from(tableSpec)
- .fromQuery("myQuery")
.withoutResultFlattening()
.usingStandardSql()
.withoutValidation();
@@ -1304,6 +1270,21 @@ public class BigQueryIOTest implements Serializable {
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("table", tableSpec));
+ assertThat(displayData, hasDisplayItem("flattenResults", false));
+ assertThat(displayData, hasDisplayItem("useLegacySql", false));
+ assertThat(displayData, hasDisplayItem("validation", false));
+ }
+
+ @Test
+ public void testBuildSourceDisplayDataQuery() {
+ BigQueryIO.Read read = BigQueryIO.Read
+ .fromQuery("myQuery")
+ .withoutResultFlattening()
+ .usingStandardSql()
+ .withoutValidation();
+
+ DisplayData displayData = DisplayData.from(read);
+
assertThat(displayData, hasDisplayItem("query", "myQuery"));
assertThat(displayData, hasDisplayItem("flattenResults", false));
assertThat(displayData, hasDisplayItem("useLegacySql", false));
@@ -1315,7 +1296,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read.Bound read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.Read
.from("project:dataset.tableId")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -1332,7 +1313,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read.Bound read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.Read
.fromQuery("foobar")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -2375,7 +2356,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+ BigQueryIO.Read read = BigQueryIO.Read.from(
options.getInputTable()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.
@@ -2388,7 +2369,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+ BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
options.getInputQuery()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.
@@ -2497,10 +2478,10 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Pipeline pipeline = TestPipeline.create(options);
bqOptions.setTempLocation("gs://testbucket/testdir");
- BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery(
+ BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
options.getInputQuery()).withoutValidation();
pipeline.apply(read1);
- BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery(
+ BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
options.getInputQuery()).withoutValidation();
pipeline.apply(read2);
assertNotEquals(read1.stepUuid, read2.stepUuid);
[09/10] beam git commit: Replace BigQueryIO.Write.to() with
BigQueryIO.write().to()
Posted by tg...@apache.org.
Replace BigQueryIO.Write.to() with BigQueryIO.write().to()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/101715a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/101715a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/101715a7
Branch: refs/heads/master
Commit: 101715a788509aa9bdfdefd54eaceca35feca485
Parents: 1a252a7
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:21:39 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:39 2017 -0700
----------------------------------------------------------------------
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/StreamingWordExtract.java | 2 +-
.../examples/complete/TrafficMaxLaneFlow.java | 2 +-
.../beam/examples/complete/TrafficRoutes.java | 2 +-
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/cookbook/FilterExamples.java | 2 +-
.../examples/cookbook/MaxPerKeyExamples.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../complete/game/utils/WriteToBigQuery.java | 2 +-
.../game/utils/WriteWindowedToBigQuery.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 46 ++++++++++++--------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 46 ++++++++++----------
13 files changed, 62 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index 861a292..fba3dc0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -491,7 +491,7 @@ public class AutoComplete {
toWrite
.apply(ParDo.of(new FormatForBigquery()))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(tableRef)
.withSchema(FormatForBigquery.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index e8d8950..2e7d451 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -136,7 +136,7 @@ public class StreamingWordExtract {
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new Uppercase()))
.apply(ParDo.of(new StringToRowConverter()))
- .apply(BigQueryIO.Write.to(tableSpec)
+ .apply(BigQueryIO.write().to(tableSpec)
.withSchema(StringToRowConverter.getSchema()));
PipelineResult result = pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 412f7fb..c9508eb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -348,7 +348,7 @@ public class TrafficMaxLaneFlow {
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
.apply(new MaxLaneFlow())
- .apply(BigQueryIO.Write.to(tableRef)
+ .apply(BigQueryIO.write().to(tableRef)
.withSchema(FormatMaxesFn.getSchema()));
// Run the pipeline.
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 50d3ae4..fc5eb89 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -359,7 +359,7 @@ public class TrafficRoutes {
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
.apply(new TrackSpeed())
- .apply(BigQueryIO.Write.to(tableRef)
+ .apply(BigQueryIO.write().to(tableRef)
.withSchema(FormatStatsFn.getSchema()));
// Run the pipeline.
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index d3c9167..f8bc104 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -158,7 +158,7 @@ public class BigQueryTornadoes {
p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new CountTornadoes())
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index fc54b13..a7016b0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -202,7 +202,7 @@ public class CombinePerKeyExamples {
p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new PlaysForWord())
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 714a8f2..7adf7c6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -241,7 +241,7 @@ public class FilterExamples {
p.apply(BigQueryIO.read().from(options.getInput()))
.apply(ParDo.of(new ProjectionFn()))
.apply(new BelowGlobalMean(options.getMonthFilter()))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 7e7bc72..a8dc7f9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -151,7 +151,7 @@ public class MaxPerKeyExamples {
p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new MaxMeanTemp())
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index af4a692..7048bde 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -452,7 +452,7 @@ public class TriggerExample {
.apply(new CalculateTotalFlow(options.getWindowDuration()));
for (int i = 0; i < resultList.size(); i++){
- resultList.get(i).apply(BigQueryIO.Write.to(tableRef).withSchema(getSchema()));
+ resultList.get(i).apply(BigQueryIO.write().to(tableRef).withSchema(getSchema()));
}
PipelineResult result = pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
index 1f33915..c124624 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java
@@ -121,7 +121,7 @@ public class WriteToBigQuery<InputT>
public PDone expand(PCollection<InputT> teamAndScore) {
return teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(getTable(teamAndScore.getPipeline(),
tableName))
.withSchema(getSchema())
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
index 7a4fb2c..7d16fa9 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java
@@ -60,7 +60,7 @@ public class WriteWindowedToBigQuery<T>
public PDone expand(PCollection<T> teamAndScore) {
return teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(getTable(teamAndScore.getPipeline(),
tableName))
.withSchema(getSchema())
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index dfb7ea6..3e699d2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1351,12 +1351,12 @@ public class BigQueryIO {
* exist.
*
* <p>By default, tables will be created if they do not exist, which corresponds to a
- * {@link CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's
- * Jobs API. A schema must be provided (via {@link BigQueryIO.Write#withSchema(TableSchema)}),
+ * {@link Write.CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of
+ * BigQuery's Jobs API. A schema must be provided (via {@link Write#withSchema(TableSchema)}),
* or else the transform may fail at runtime with an {@link IllegalArgumentException}.
*
* <p>By default, writes require an empty table, which corresponds to
- * a {@link WriteDisposition#WRITE_EMPTY} disposition that matches the
+ * a {@link Write.WriteDisposition#WRITE_EMPTY} disposition that matches the
* default of BigQuery's Jobs API.
*
* <p>Here is a sample transform that produces TableRow values containing
@@ -1371,6 +1371,16 @@ public class BigQueryIO {
* }
* }}</pre>
*/
+ public static Write write() {
+ return new AutoValue_BigQueryIO_Write.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
+ .setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
+ .build();
+ }
+
+ /** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
@VisibleForTesting
@@ -1416,14 +1426,6 @@ public class BigQueryIO {
abstract Write build();
}
- private static Builder builder() {
- return new AutoValue_BigQueryIO_Write.Builder()
- .setValidate(true)
- .setBigQueryServices(new BigQueryServicesImpl())
- .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
- }
-
/**
* An enumeration type for the BigQuery create disposition strings.
*
@@ -1491,23 +1493,30 @@ public class BigQueryIO {
WRITE_EMPTY
}
+ /** Ensures that methods of the to() family are called at most once. */
+ private void ensureToNotCalledYet() {
+ checkState(
+ getJsonTableRef() == null && getTable() == null, "to() already called");
+ }
+
/**
* Creates a write transformation for the given table specification.
*
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
*/
- public static Write to(String tableSpec) {
+ public Write to(String tableSpec) {
return to(StaticValueProvider.of(tableSpec));
}
/** Creates a write transformation for the given table. */
- public static Write to(TableReference table) {
+ public Write to(TableReference table) {
return to(StaticValueProvider.of(toTableSpec(table)));
}
/** Creates a write transformation for the given table. */
- public static Write to(ValueProvider<String> tableSpec) {
- return builder()
+ public Write to(ValueProvider<String> tableSpec) {
+ ensureToNotCalledYet();
+ return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -1526,7 +1535,7 @@ public class BigQueryIO {
* <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
* always return the same table specification.
*/
- public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ public Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
}
@@ -1537,9 +1546,10 @@ public class BigQueryIO {
* <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
* always return the same table reference.
*/
- private static Write toTableReference(
+ private Write toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return builder().setTableRefFunction(tableRefFunction).build();
+ ensureToNotCalledYet();
+ return toBuilder().setTableRefFunction(tableRefFunction).build();
}
private static class TranslateTableSpecFunction implements
http://git-wip-us.apache.org/repos/asf/beam/blob/101715a7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f6a7fb4..8a53d02 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -963,7 +963,7 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "b").set("number", 2),
new TableRow().set("name", "c").set("number", 3))
.withCoder(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write.to("dataset-id.table-id")
+ .apply(BigQueryIO.write().to("dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
@@ -997,7 +997,7 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "d").set("number", 4))
.withCoder(TableRowJsonCoder.of()))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
- .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+ .apply(BigQueryIO.write().to("project-id:dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
ImmutableList.of(
@@ -1153,7 +1153,7 @@ public class BigQueryIOTest implements Serializable {
p.apply(Create.of(inserts).withCoder(TableRowJsonCoder.of()))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(Window.<TableRow>into(window))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to(tableFunction)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema().setFields(
@@ -1205,7 +1205,7 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "b").set("number", 2),
new TableRow().set("name", "c").set("number", 3))
.withCoder(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+ .apply(BigQueryIO.write().to("project-id:dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withTestServices(fakeBqServices)
.withoutValidation());
@@ -1238,7 +1238,7 @@ public class BigQueryIOTest implements Serializable {
new TableRow().set("name", "b").set("number", 2),
new TableRow().set("name", "c").set("number", 3))
.withCoder(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write.to("dataset-id.table-id")
+ .apply(BigQueryIO.write().to("dataset-id.table-id")
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withTestServices(fakeBqServices)
.withoutValidation());
@@ -1328,7 +1328,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWrite() {
BigQueryIO.Write write =
- BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
+ BigQueryIO.write().to("foo.com:project:somedataset.sometable");
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1354,7 +1354,7 @@ public class BigQueryIOTest implements Serializable {
options.as(StreamingOptions.class).setStreaming(streaming);
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("project:dataset.table")
.withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
.withTestServices(new FakeBigQueryServices()
@@ -1375,7 +1375,7 @@ public class BigQueryIOTest implements Serializable {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Write write =
- BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
+ BigQueryIO.write().to("foo.com:project:somedataset.sometable").withoutValidation();
checkWriteObjectWithValidate(
write,
"foo.com:project",
@@ -1390,7 +1390,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteDefaultProject() {
- BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
+ BigQueryIO.Write write = BigQueryIO.write().to("somedataset.sometable");
checkWriteObject(
write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1402,7 +1402,7 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Write write = BigQueryIO.Write.to(table);
+ BigQueryIO.Write write = BigQueryIO.write().to(table);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1412,7 +1412,7 @@ public class BigQueryIOTest implements Serializable {
public void testBuildWriteWithSchema() {
TableSchema schema = new TableSchema();
BigQueryIO.Write write =
- BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
+ BigQueryIO.write().to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
write, "foo.com:project", "somedataset", "sometable",
schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
@@ -1420,7 +1420,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithCreateDispositionNever() {
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
@@ -1430,7 +1430,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithCreateDispositionIfNeeded() {
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
@@ -1440,7 +1440,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithWriteDispositionTruncate() {
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
@@ -1450,7 +1450,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithWriteDispositionAppend() {
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
@@ -1460,7 +1460,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithWriteDispositionEmpty() {
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
@@ -1471,7 +1471,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteWithWriteWithTableDescription() {
final String tblDescription = "foo bar table";
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to("foo.com:project:somedataset.sometable")
.withTableDescription(tblDescription);
checkWriteObject(
@@ -1491,7 +1491,7 @@ public class BigQueryIOTest implements Serializable {
TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
final String tblDescription = "foo bar table";
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to(tableSpec)
.withSchema(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1556,7 +1556,7 @@ public class BigQueryIOTest implements Serializable {
.or(Matchers.containsString("BigQuery dataset not found for table")));
tableRows
.apply(
- BigQueryIO.Write.to(tableRef)
+ BigQueryIO.write().to(tableRef)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(new TableSchema())
.withTestServices(fakeBqServices));
@@ -1603,7 +1603,7 @@ public class BigQueryIOTest implements Serializable {
}))
.setCoder(TableRowJsonCoder.of());
tableRows
- .apply(BigQueryIO.Write.to(tableRef)
+ .apply(BigQueryIO.write().to(tableRef)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withoutValidation());
}
@@ -1675,7 +1675,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBigQueryIOGetName() {
assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
- assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
+ assertEquals("BigQueryIO.Write", BigQueryIO.write().to("somedataset.sometable").getName());
}
@Test
@@ -1686,7 +1686,7 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage("no schema was provided");
p
.apply(Create.empty(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write
+ .apply(BigQueryIO.write()
.to("dataset.table")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
}
@@ -2343,7 +2343,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Write write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.write()
.to(options.getOutputTable())
.withSchema(NestedValueProvider.of(
options.getOutputSchema(), new JsonSchemaToTableSchema()))
[03/10] beam git commit: Condense BigQueryIO.Write.Bound into
BigQueryIO.Write
Posted by tg...@apache.org.
Condense BigQueryIO.Write.Bound into BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d1f4400
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d1f4400
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d1f4400
Branch: refs/heads/master
Commit: 7d1f4400ab844c7b4e636482891be55174390431
Parents: 825338a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:26:09 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:24 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1080 +++++++++---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 116 +-
2 files changed, 552 insertions(+), 644 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f6c8575..90d7f67 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1415,7 +1415,43 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Write {
+ public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ // Maximum number of files in a single partition.
+ static final int MAX_NUM_FILES = 10000;
+
+ // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
+ static final long MAX_SIZE_BYTES = 11 * (1L << 40);
+
+ // The maximum number of retry jobs.
+ private static final int MAX_RETRY_JOBS = 3;
+
+ // The maximum number of retries to poll the status of a job.
+ // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+ private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ @Nullable private final ValueProvider<String> jsonTableRef;
+
+ @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+
+ // Table schema. The schema is required only if the table does not exist.
+ @Nullable private final ValueProvider<String> jsonSchema;
+
+ // Options for creating the table. Valid values are CREATE_IF_NEEDED and
+ // CREATE_NEVER.
+ final CreateDisposition createDisposition;
+
+ // Options for writing to the table. Valid values are WRITE_TRUNCATE,
+ // WRITE_APPEND and WRITE_EMPTY.
+ final WriteDisposition writeDisposition;
+
+ @Nullable
+ final String tableDescription;
+
+ // An option to indicate if table validation is desired. Default is true.
+ final boolean validate;
+
+ @Nullable private BigQueryServices bigQueryServices;
+
/**
* An enumeration type for the BigQuery create disposition strings.
*
@@ -1488,18 +1524,18 @@ public class BigQueryIO {
*
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
*/
- public static Bound to(String tableSpec) {
- return new Bound().to(tableSpec);
+ public static Write to(String tableSpec) {
+ return new Write().withTableSpec(tableSpec);
}
/** Creates a write transformation for the given table. */
- public static Bound to(ValueProvider<String> tableSpec) {
- return new Bound().to(tableSpec);
+ public static Write to(ValueProvider<String> tableSpec) {
+ return new Write().withTableSpec(tableSpec);
}
/** Creates a write transformation for the given table. */
- public static Bound to(TableReference table) {
- return new Bound().to(table);
+ public static Write to(TableReference table) {
+ return new Write().withTableRef(table);
}
/**
@@ -1513,8 +1549,8 @@ public class BigQueryIO {
* <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
* always return the same table specification.
*/
- public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return new Bound().to(tableSpecFunction);
+ public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return new Write().withTableSpec(tableSpecFunction);
}
/**
@@ -1524,634 +1560,547 @@ public class BigQueryIO {
* <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
* always return the same table reference.
*/
- public static Bound toTableReference(
+ private static Write toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Bound().toTableReference(tableRefFunction);
+ return new Write().withTableRef(tableRefFunction);
+ }
+
+ private static class TranslateTableSpecFunction implements
+ SerializableFunction<BoundedWindow, TableReference> {
+ private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+
+ TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ this.tableSpecFunction = tableSpecFunction;
+ }
+
+ @Override
+ public TableReference apply(BoundedWindow value) {
+ return parseTableSpec(tableSpecFunction.apply(value));
+ }
+ }
+
+ private Write() {
+ this(
+ null /* name */,
+ null /* jsonTableRef */,
+ null /* tableRefFunction */,
+ null /* jsonSchema */,
+ CreateDisposition.CREATE_IF_NEEDED,
+ WriteDisposition.WRITE_EMPTY,
+ null /* tableDescription */,
+ true /* validate */,
+ null /* bigQueryServices */);
+ }
+
+ private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
+ @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+ @Nullable ValueProvider<String> jsonSchema,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition,
+ @Nullable String tableDescription,
+ boolean validate,
+ @Nullable BigQueryServices bigQueryServices) {
+ super(name);
+ this.jsonTableRef = jsonTableRef;
+ this.tableRefFunction = tableRefFunction;
+ this.jsonSchema = jsonSchema;
+ this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+ this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+ this.tableDescription = tableDescription;
+ this.validate = validate;
+ this.bigQueryServices = bigQueryServices;
}
/**
- * Creates a write transformation with the specified schema to use in table creation.
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
*
- * <p>The schema is <i>required</i> only if writing to a table that does not already
- * exist, and {@link CreateDisposition} is set to
- * {@link CreateDisposition#CREATE_IF_NEEDED}.
+ * <p>Does not modify this object.
*/
- public static Bound withSchema(TableSchema schema) {
- return new Bound().withSchema(schema);
+ private Write withTableSpec(String tableSpec) {
+ return withTableRef(NestedValueProvider.of(
+ StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
}
/**
- * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
*/
- public static Bound withSchema(ValueProvider<TableSchema> schema) {
- return new Bound().withSchema(schema);
+ public Write withTableRef(TableReference table) {
+ return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
}
- /** Creates a write transformation with the specified options for creating the table. */
- public static Bound withCreateDisposition(CreateDisposition disposition) {
- return new Bound().withCreateDisposition(disposition);
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableSpec(ValueProvider<String> tableSpec) {
+ return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
}
- /** Creates a write transformation with the specified options for writing to the table. */
- public static Bound withWriteDisposition(WriteDisposition disposition) {
- return new Bound().withWriteDisposition(disposition);
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ private Write withTableRef(ValueProvider<TableReference> table) {
+ return new Write(name,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, bigQueryServices);
}
- /** Creates a write transformation with the specified table description. */
- public static Bound withTableDescription(@Nullable String tableDescription) {
- return new Bound().withTableDescription(tableDescription);
+ /**
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
+ * should always return the same table specification.
+ */
+ private Write withTableSpec(
+ SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
}
/**
- * Creates a write transformation with BigQuery table validation disabled.
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+ * always return the same table reference.
*/
- public static Bound withoutValidation() {
- return new Bound().withoutValidation();
+ private Write withTableRef(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, bigQueryServices);
}
/**
- * A {@link PTransform} that can write either a bounded or unbounded
- * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
+ * Returns a copy of this write transformation, but using the specified schema for rows
+ * to be written.
+ *
+ * <p>Does not modify this object.
*/
- public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
- // Maximum number of files in a single partition.
- static final int MAX_NUM_FILES = 10000;
-
- // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
- static final long MAX_SIZE_BYTES = 11 * (1L << 40);
-
- // The maximum number of retry jobs.
- static final int MAX_RETRY_JOBS = 3;
-
- // The maximum number of retries to poll the status of a job.
- // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
- static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
- @Nullable final ValueProvider<String> jsonTableRef;
-
- @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
- // Table schema. The schema is required only if the table does not exist.
- @Nullable final ValueProvider<String> jsonSchema;
+ public Write withSchema(TableSchema schema) {
+ return new Write(name, jsonTableRef, tableRefFunction,
+ StaticValueProvider.of(toJsonString(schema)),
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // Options for creating the table. Valid values are CREATE_IF_NEEDED and
- // CREATE_NEVER.
- final CreateDisposition createDisposition;
+ /**
+ * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ */
+ public Write withSchema(ValueProvider<TableSchema> schema) {
+ return new Write(name, jsonTableRef, tableRefFunction,
+ NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // Options for writing to the table. Valid values are WRITE_TRUNCATE,
- // WRITE_APPEND and WRITE_EMPTY.
- final WriteDisposition writeDisposition;
+ /**
+ * Returns a copy of this write transformation, but using the specified create disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withCreateDisposition(CreateDisposition createDisposition) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- @Nullable final String tableDescription;
+ /**
+ * Returns a copy of this write transformation, but using the specified write disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withWriteDisposition(WriteDisposition writeDisposition) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // An option to indicate if table validation is desired. Default is true.
- final boolean validate;
+ /**
+ * Returns a copy of this write transformation, but using the specified table description.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableDescription(@Nullable String tableDescription) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- @Nullable private BigQueryServices bigQueryServices;
+ /**
+ * Returns a copy of this write transformation, but without BigQuery table validation.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withoutValidation() {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, false, bigQueryServices);
+ }
- private static class TranslateTableSpecFunction implements
- SerializableFunction<BoundedWindow, TableReference> {
- private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+ @VisibleForTesting
+ Write withTestServices(BigQueryServices testServices) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, testServices);
+ }
- TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- this.tableSpecFunction = tableSpecFunction;
+ private static void verifyTableNotExistOrEmpty(
+ DatasetService datasetService,
+ TableReference tableRef) {
+ try {
+ if (datasetService.getTable(tableRef) != null) {
+ checkState(
+ datasetService.isTableEmpty(tableRef),
+ "BigQuery table is not empty: %s.",
+ BigQueryIO.toTableSpec(tableRef));
}
-
- @Override
- public TableReference apply(BoundedWindow value) {
- return parseTableSpec(tableSpecFunction.apply(value));
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
+ throw new RuntimeException(
+ "unable to confirm BigQuery table emptiness for table "
+ + BigQueryIO.toTableSpec(tableRef), e);
}
+ }
- /**
- * @deprecated Should be private. Instead, use one of the factory methods in
- * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
- * instance of this class.
- */
- @Deprecated
- public Bound() {
- this(
- null /* name */,
- null /* jsonTableRef */,
- null /* tableRefFunction */,
- null /* jsonSchema */,
- CreateDisposition.CREATE_IF_NEEDED,
- WriteDisposition.WRITE_EMPTY,
- null /* tableDescription */,
- true /* validate */,
- null /* bigQueryServices */);
- }
-
- private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<String> jsonSchema,
- CreateDisposition createDisposition,
- WriteDisposition writeDisposition,
- @Nullable String tableDescription,
- boolean validate,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.tableRefFunction = tableRefFunction;
- this.jsonSchema = jsonSchema;
- this.createDisposition = checkNotNull(createDisposition, "createDisposition");
- this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
- this.tableDescription = tableDescription;
- this.validate = validate;
- this.bigQueryServices = bigQueryServices;
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Bound to(String tableSpec) {
- return toTableRef(NestedValueProvider.of(
- StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
- }
+ @Override
+ public void validate(PCollection<TableRow> input) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Bound to(TableReference table) {
- return to(StaticValueProvider.of(toTableSpec(table)));
- }
+ // Exactly one of the table and table reference can be configured.
+ checkState(
+ jsonTableRef != null || tableRefFunction != null,
+ "must set the table reference of a BigQueryIO.Write transform");
+ checkState(
+ jsonTableRef == null || tableRefFunction == null,
+ "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ + " transform");
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Bound to(ValueProvider<String> tableSpec) {
- return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+ // Require a schema if creating one or more tables.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+ // The user specified a table.
+ if (jsonTableRef != null && validate) {
+ TableReference table = getTableWithDefaultProject(options).get();
+
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
+ verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ verifyTableNotExistOrEmpty(datasetService, table);
+ }
}
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- private Bound toTableRef(ValueProvider<TableReference> table) {
- return new Bound(name,
- NestedValueProvider.of(table, new TableRefToJson()),
- tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+ // We will use BigQuery's streaming write API -- validate supported dispositions.
+ if (tableRefFunction != null) {
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+ + " function.");
+ }
+ if (jsonSchema == null) {
+ checkArgument(
+ createDisposition == CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+ }
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
- * should always return the same table specification.
- */
- public Bound to(
- SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
+ checkArgument(
+ writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ + " when using a tablespec function.");
+ } else {
+ // We will use a BigQuery load job -- validate the temp location.
+ String tempLocation = options.getTempLocation();
+ checkArgument(
+ !Strings.isNullOrEmpty(tempLocation),
+ "BigQueryIO.Write needs a GCS temp location to store temp files.");
+ if (bigQueryServices == null) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+ tempLocation),
+ e);
+ }
+ }
}
+ }
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
- * always return the same table reference.
- */
- public Bound toTableReference(
- SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
+ @Override
+ public PDone expand(PCollection<TableRow> input) {
+ Pipeline p = input.getPipeline();
+ BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+ BigQueryServices bqServices = getBigQueryServices();
+
+ // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
+ // StreamWithDeDup and BigQuery's streaming import API.
+ if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+ return input.apply(
+ new StreamWithDeDup(getTable(), tableRefFunction,
+ jsonSchema == null ? null : NestedValueProvider.of(
+ jsonSchema, new JsonSchemaToTableSchema()),
+ createDisposition,
+ tableDescription,
+ bqServices));
}
- /**
- * Returns a copy of this write transformation, but using the specified schema for rows
- * to be written.
- *
- * <p>Does not modify this object.
- */
- public Bound withSchema(TableSchema schema) {
- return new Bound(name, jsonTableRef, tableRefFunction,
- StaticValueProvider.of(toJsonString(schema)),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- /**
- * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
- */
- public Bound withSchema(ValueProvider<TableSchema> schema) {
- return new Bound(name, jsonTableRef, tableRefFunction,
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ String stepUuid = randomUUIDString();
- /**
- * Returns a copy of this write transformation, but using the specified create disposition.
- *
- * <p>Does not modify this object.
- */
- public Bound withCreateDisposition(CreateDisposition createDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ String tempLocation = options.getTempLocation();
+ String tempFilePrefix;
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ tempFilePrefix = factory.resolve(
+ factory.resolve(tempLocation, "BigQueryWriteTemp"),
+ stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+ e);
}
- /**
- * Returns a copy of this write transformation, but using the specified write disposition.
- *
- * <p>Does not modify this object.
- */
- public Bound withWriteDisposition(WriteDisposition writeDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ // Create a singleton job ID token at execution time.
+ PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+ PCollectionView<String> jobIdTokenView = p
+ .apply("TriggerIdCreation", Create.of("ignored"))
+ .apply("CreateJobId", MapElements.via(
+ new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return randomUUIDString();
+ }
+ }))
+ .apply(View.<String>asSingleton());
+
+ PCollection<TableRow> inputInGlobalWindow =
+ input.apply(
+ Window.<TableRow>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> results = inputInGlobalWindow
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles(tempFilePrefix)));
+
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<Long, List<String>>> singlePartitionTag =
+ new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+ PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+ .apply("ResultsView", View.<KV<String, Long>>asIterable());
+ PCollectionTuple partitions = singleton.apply(ParDo
+ .of(new WritePartition(
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+ // Write multiple partitions to separate temporary tables
+ PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+ .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+ false,
+ bqServices,
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ jsonSchema,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ tableDescription))
+ .withSideInputs(jobIdTokenView));
+
+ PCollectionView<Iterable<String>> tempTablesView = tempTables
+ .apply("TempTablesView", View.<String>asIterable());
+ singleton.apply(ParDo
+ .of(new WriteRename(
+ bqServices,
+ jobIdTokenView,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ writeDisposition,
+ createDisposition,
+ tempTablesView,
+ tableDescription))
+ .withSideInputs(tempTablesView, jobIdTokenView));
+
+ // Write single partition to final table
+ partitions.get(singlePartitionTag)
+ .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+ true,
+ bqServices,
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ jsonSchema,
+ writeDisposition,
+ createDisposition,
+ tableDescription))
+ .withSideInputs(jobIdTokenView));
- /**
- * Returns a copy of this write transformation, but using the specified table description.
- *
- * <p>Does not modify this object.
- */
- public Bound withTableDescription(@Nullable String tableDescription) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ return PDone.in(input.getPipeline());
+ }
- /**
- * Returns a copy of this write transformation, but without BigQuery table validation.
- *
- * <p>Does not modify this object.
- */
- public Bound withoutValidation() {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, false, bigQueryServices);
- }
+ private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
+ private transient TableRowWriter writer = null;
+ private final String tempFilePrefix;
- @VisibleForTesting
- Bound withTestServices(BigQueryServices testServices) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, testServices);
+ WriteBundles(String tempFilePrefix) {
+ this.tempFilePrefix = tempFilePrefix;
}
- private static void verifyTableNotExistOrEmpty(
- DatasetService datasetService,
- TableReference tableRef) {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ if (writer == null) {
+ writer = new TableRowWriter(tempFilePrefix);
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {}", writer);
+ }
try {
- if (datasetService.getTable(tableRef) != null) {
- checkState(
- datasetService.isTableEmpty(tableRef),
- "BigQuery table is not empty: %s.",
- BigQueryIO.toTableSpec(tableRef));
- }
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
}
- throw new RuntimeException(
- "unable to confirm BigQuery table emptiness for table "
- + BigQueryIO.toTableSpec(tableRef), e);
+ throw e;
}
}
- @Override
- public void validate(PCollection<TableRow> input) {
- BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
- // Exactly one of the table and table reference can be configured.
- checkState(
- jsonTableRef != null || tableRefFunction != null,
- "must set the table reference of a BigQueryIO.Write transform");
- checkState(
- jsonTableRef == null || tableRefFunction == null,
- "Cannot set both a table reference and a table function for a BigQueryIO.Write"
- + " transform");
-
- // Require a schema if creating one or more tables.
- checkArgument(
- createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
- "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
- // The user specified a table.
- if (jsonTableRef != null && validate) {
- TableReference table = getTableWithDefaultProject(options).get();
-
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
- // Check for destination table presence and emptiness for early failure notification.
- // Note that a presence check can fail when the table or dataset is created by an earlier
- // stage of the pipeline. For these cases the #withoutValidation method can be used to
- // disable the check.
- verifyDatasetPresence(datasetService, table);
- if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- verifyTablePresence(datasetService, table);
- }
- if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- verifyTableNotExistOrEmpty(datasetService, table);
- }
- }
-
- if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
- // We will use BigQuery's streaming write API -- validate supported dispositions.
- if (tableRefFunction != null) {
- checkArgument(
- createDisposition != CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
- + " function.");
- }
- if (jsonSchema == null) {
- checkArgument(
- createDisposition == CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
- }
-
- checkArgument(
- writeDisposition != WriteDisposition.WRITE_TRUNCATE,
- "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
- + " when using a tablespec function.");
- } else {
- // We will use a BigQuery load job -- validate the temp location.
- String tempLocation = options.getTempLocation();
- checkArgument(
- !Strings.isNullOrEmpty(tempLocation),
- "BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
- try {
- GcsPath.fromUri(tempLocation);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
- tempLocation),
- e);
- }
- }
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ c.output(writer.close());
+ writer = null;
}
}
@Override
- public PDone expand(PCollection<TableRow> input) {
- Pipeline p = input.getPipeline();
- BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices = getBigQueryServices();
-
- // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
- // StreamWithDeDup and BigQuery's streaming import API.
- if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
- return input.apply(
- new StreamWithDeDup(getTable(), tableRefFunction,
- jsonSchema == null ? null : NestedValueProvider.of(
- jsonSchema, new JsonSchemaToTableSchema()),
- createDisposition,
- tableDescription,
- bqServices));
- }
-
- ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
- String stepUuid = randomUUIDString();
-
- String tempLocation = options.getTempLocation();
- String tempFilePrefix;
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQueryWriteTemp"),
- stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
- e);
- }
-
- // Create a singleton job ID token at execution time.
- PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
- PCollectionView<String> jobIdTokenView = p
- .apply("TriggerIdCreation", Create.of("ignored"))
- .apply("CreateJobId", MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return randomUUIDString();
- }
- }))
- .apply(View.<String>asSingleton());
-
- PCollection<TableRow> inputInGlobalWindow =
- input.apply(
- Window.<TableRow>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> results = inputInGlobalWindow
- .apply("WriteBundles",
- ParDo.of(new WriteBundles(tempFilePrefix)));
-
- TupleTag<KV<Long, List<String>>> multiPartitionsTag =
- new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<Long, List<String>>> singlePartitionTag =
- new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
- PCollectionView<Iterable<KV<String, Long>>> resultsView = results
- .apply("ResultsView", View.<KV<String, Long>>asIterable());
- PCollectionTuple partitions = singleton.apply(ParDo
- .of(new WritePartition(
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
- // Write multiple partitions to separate temporary tables
- PCollection<String> tempTables = partitions.get(multiPartitionsTag)
- .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
- false,
- bqServices,
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- tableDescription))
- .withSideInputs(jobIdTokenView));
-
- PCollectionView<Iterable<String>> tempTablesView = tempTables
- .apply("TempTablesView", View.<String>asIterable());
- singleton.apply(ParDo
- .of(new WriteRename(
- bqServices,
- jobIdTokenView,
- NestedValueProvider.of(table, new TableRefToJson()),
- writeDisposition,
- createDisposition,
- tempTablesView,
- tableDescription))
- .withSideInputs(tempTablesView, jobIdTokenView));
-
- // Write single partition to final table
- partitions.get(singlePartitionTag)
- .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
- true,
- bqServices,
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- writeDisposition,
- createDisposition,
- tableDescription))
- .withSideInputs(jobIdTokenView));
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- return PDone.in(input.getPipeline());
+ builder
+ .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+ .withLabel("Temporary File Prefix"));
}
+ }
- private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
- private transient TableRowWriter writer = null;
- private final String tempFilePrefix;
-
- WriteBundles(String tempFilePrefix) {
- this.tempFilePrefix = tempFilePrefix;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- if (writer == null) {
- writer = new TableRowWriter(tempFilePrefix);
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {}", writer);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- c.output(writer.close());
- writer = null;
- }
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"));
- }
- }
+ builder
+ .addIfNotNull(DisplayData.item("table", jsonTableRef)
+ .withLabel("Table Reference"))
+ .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .withLabel("Table Schema"));
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
+ if (tableRefFunction != null) {
+ builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ .withLabel("Table Reference Function"));
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("table", jsonTableRef)
- .withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("schema", jsonSchema)
- .withLabel("Table Schema"));
-
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
- .withLabel("Table Reference Function"));
- }
+ builder
+ .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .withLabel("Table CreateDisposition"))
+ .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .withLabel("Table WriteDisposition"))
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"), true)
+ .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+ .withLabel("Table Description"));
+ }
- builder
- .add(DisplayData.item("createDisposition", createDisposition.toString())
- .withLabel("Table CreateDisposition"))
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
- .withLabel("Table WriteDisposition"))
- .addIfNotDefault(DisplayData.item("validation", validate)
- .withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
- }
+ /** Returns the create disposition. */
+ public CreateDisposition getCreateDisposition() {
+ return createDisposition;
+ }
- /** Returns the create disposition. */
- public CreateDisposition getCreateDisposition() {
- return createDisposition;
- }
+ /** Returns the write disposition. */
+ public WriteDisposition getWriteDisposition() {
+ return writeDisposition;
+ }
- /** Returns the write disposition. */
- public WriteDisposition getWriteDisposition() {
- return writeDisposition;
- }
+ /** Returns the table schema. */
+ public TableSchema getSchema() {
+ return fromJsonString(
+ jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ }
- /** Returns the table schema. */
- public TableSchema getSchema() {
- return fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ /**
+ * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+ *
+ * <p>If the table's project is not specified, use the executing project.
+ */
+ @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ BigQueryOptions bqOptions) {
+ ValueProvider<TableReference> table = getTable();
+ if (table == null) {
+ return table;
}
-
- /**
- * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
- *
- * <p>If the table's project is not specified, use the executing project.
- */
- @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
- BigQueryOptions bqOptions) {
- ValueProvider<TableReference> table = getTable();
- if (table == null) {
- return table;
- }
- if (!table.isAccessible()) {
- LOG.info("Using a dynamic value for table input. This must contain a project"
- + " in the table reference: {}", table);
- return table;
- }
- if (Strings.isNullOrEmpty(table.get().getProjectId())) {
- // If user does not specify a project we assume the table to be located in
- // the default project.
- TableReference tableRef = table.get();
- tableRef.setProjectId(bqOptions.getProject());
- return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(tableRef)), new JsonTableRefToTableRef());
- }
+ if (!table.isAccessible()) {
+ LOG.info("Using a dynamic value for table input. This must contain a project"
+ + " in the table reference: {}", table);
return table;
}
-
- /** Returns the table reference, or {@code null}. */
- @Nullable
- public ValueProvider<TableReference> getTable() {
- return jsonTableRef == null ? null :
- NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ TableReference tableRef = table.get();
+ tableRef.setProjectId(bqOptions.getProject());
+ return NestedValueProvider.of(StaticValueProvider.of(
+ toJsonString(tableRef)), new JsonTableRefToTableRef());
}
+ return table;
+ }
- /** Returns {@code true} if table validation is enabled. */
- public boolean getValidate() {
- return validate;
- }
+ /** Returns the table reference, or {@code null}. */
+ @Nullable
+ public ValueProvider<TableReference> getTable() {
+ return jsonTableRef == null ? null :
+ NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ }
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
+ /** Returns {@code true} if table validation is enabled. */
+ public boolean getValidate() {
+ return validate;
+ }
+
+ private BigQueryServices getBigQueryServices() {
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
}
+ return bigQueryServices;
}
static class TableRowWriter {
@@ -2231,8 +2180,8 @@ public class BigQueryIO {
List<String> currResults = Lists.newArrayList();
for (int i = 0; i < results.size(); ++i) {
KV<String, Long> fileResult = results.get(i);
- if (currNumFiles + 1 > Bound.MAX_NUM_FILES
- || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+ if (currNumFiles + 1 > Write.MAX_NUM_FILES
+ || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
currResults = Lists.newArrayList();
currNumFiles = 0;
@@ -2331,13 +2280,13 @@ public class BigQueryIO {
String projectId = ref.getProjectId();
Job lastFailedLoadJob = null;
- for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+ for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
- Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
@@ -2361,7 +2310,7 @@ public class BigQueryIO {
"Failed to create load job with id prefix %s, "
+ "reached max retries: %d, last failed load job: %s.",
jobIdPrefix,
- Bound.MAX_RETRY_JOBS,
+ Write.MAX_RETRY_JOBS,
jobToPrettyString(lastFailedLoadJob)));
}
@@ -2477,13 +2426,13 @@ public class BigQueryIO {
String projectId = ref.getProjectId();
Job lastFailedCopyJob = null;
- for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+ for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startCopyJob(jobRef, copyConfig);
- Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = parseStatus(copyJob);
switch (jobStatus) {
case SUCCEEDED:
@@ -2507,7 +2456,7 @@ public class BigQueryIO {
"Failed to create copy job with id prefix %s, "
+ "reached max retries: %d, last failed copy job: %s.",
jobIdPrefix,
- Bound.MAX_RETRY_JOBS,
+ Write.MAX_RETRY_JOBS,
jobToPrettyString(lastFailedCopyJob)));
}
@@ -2536,9 +2485,6 @@ public class BigQueryIO {
.withLabel("Create Disposition"));
}
}
-
- /** Disallow construction of utility class. */
- private Write() {}
}
private static String jobToPrettyString(@Nullable Job job) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bb1528b..f403c5a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -690,11 +690,11 @@ public class BigQueryIOTest implements Serializable {
}
private void checkWriteObject(
- BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+ BigQueryIO.Write write, String project, String dataset, String table,
TableSchema schema, CreateDisposition createDisposition,
WriteDisposition writeDisposition, String tableDescription) {
checkWriteObjectWithValidate(
- bound,
+ write,
project,
dataset,
table,
@@ -706,17 +706,17 @@ public class BigQueryIOTest implements Serializable {
}
private void checkWriteObjectWithValidate(
- BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+ BigQueryIO.Write write, String project, String dataset, String table,
TableSchema schema, CreateDisposition createDisposition,
WriteDisposition writeDisposition, String tableDescription, boolean validate) {
- assertEquals(project, bound.getTable().get().getProjectId());
- assertEquals(dataset, bound.getTable().get().getDatasetId());
- assertEquals(table, bound.getTable().get().getTableId());
- assertEquals(schema, bound.getSchema());
- assertEquals(createDisposition, bound.createDisposition);
- assertEquals(writeDisposition, bound.writeDisposition);
- assertEquals(tableDescription, bound.tableDescription);
- assertEquals(validate, bound.validate);
+ assertEquals(project, write.getTable().get().getProjectId());
+ assertEquals(dataset, write.getTable().get().getDatasetId());
+ assertEquals(table, write.getTable().get().getTableId());
+ assertEquals(schema, write.getSchema());
+ assertEquals(createDisposition, write.createDisposition);
+ assertEquals(writeDisposition, write.writeDisposition);
+ assertEquals(tableDescription, write.tableDescription);
+ assertEquals(validate, write.validate);
}
@Before
@@ -1328,10 +1328,10 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWrite() {
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@@ -1355,7 +1355,7 @@ public class BigQueryIOTest implements Serializable {
options.as(StreamingOptions.class).setStreaming(streaming);
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("project:dataset.table")
.withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
.withTestServices(new FakeBigQueryServices()
@@ -1375,10 +1375,10 @@ public class BigQueryIOTest implements Serializable {
public void testBuildWriteWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
checkWriteObjectWithValidate(
- bound,
+ write,
"foo.com:project",
"somedataset",
"sometable",
@@ -1391,9 +1391,9 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteDefaultProject() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
+ BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
checkWriteObject(
- bound, null, "somedataset", "sometable",
+ write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@@ -1403,89 +1403,80 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
+ BigQueryIO.Write write = BigQueryIO.Write.to(table);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
- @Category(NeedsRunner.class)
- public void testBuildWriteWithoutTable() {
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("must set the table reference");
- p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation());
- }
-
- @Test
public void testBuildWriteWithSchema() {
TableSchema schema = new TableSchema();
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithCreateDispositionNever() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithCreateDispositionIfNeeded() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithWriteDispositionTruncate() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
}
@Test
public void testBuildWriteWithWriteDispositionAppend() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
}
@Test
public void testBuildWriteWithWriteDispositionEmpty() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithWriteWithTableDescription() {
final String tblDescription = "foo bar table";
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withTableDescription(tblDescription);
checkWriteObject(
- bound,
+ write,
"foo.com:project",
"somedataset",
"sometable",
@@ -1501,7 +1492,7 @@ public class BigQueryIOTest implements Serializable {
TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
final String tblDescription = "foo bar table";
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to(tableSpec)
.withSchema(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1702,35 +1693,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testWriteValidateFailsTableAndTableSpec() {
- p.enableAbandonedNodeEnforcement(false);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Cannot set both a table reference and a table function");
- p
- .apply(Create.empty(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write
- .to("dataset.table")
- .to(new SerializableFunction<BoundedWindow, String>() {
- @Override
- public String apply(BoundedWindow input) {
- return null;
- }
- }));
- }
-
- @Test
- public void testWriteValidateFailsNoTableAndNoTableSpec() {
- p.enableAbandonedNodeEnforcement(false);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
- p
- .apply(Create.empty(TableRowJsonCoder.of()))
- .apply("name", BigQueryIO.Write.withoutValidation());
- }
-
- @Test
public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(mockJobService)
@@ -2094,7 +2056,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionSinglePartition() throws Exception {
- long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+ long numFiles = BigQueryIO.Write.MAX_NUM_FILES;
long fileSize = 1;
// One partition is needed.
@@ -2104,7 +2066,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionManyFiles() throws Exception {
- long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+ long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3;
long fileSize = 1;
// One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
@@ -2115,7 +2077,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionLargeFileSize() throws Exception {
long numFiles = 10;
- long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+ long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3;
// One partition is needed for each group of three files.
long expectedNumPartitions = 4;
@@ -2382,7 +2344,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to(options.getOutputTable())
.withSchema(NestedValueProvider.of(
options.getOutputSchema(), new JsonSchemaToTableSchema()))
[05/10] beam git commit: Use AutoValue for BigQueryIO.Read
Posted by tg...@apache.org.
Use AutoValue for BigQueryIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32cba321
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32cba321
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32cba321
Branch: refs/heads/master
Commit: 32cba321c04c5e3fb18856c84ea10b3513264dd5
Parents: d6b3e11
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:51:04 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 215 +++++++------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 +-
2 files changed, 84 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 90d7f67..e5db60e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -36,9 +36,11 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -453,97 +455,86 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
- @Nullable final ValueProvider<String> jsonTableRef;
- @Nullable final ValueProvider<String> query;
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+ @Nullable abstract ValueProvider<String> getJsonTableRef();
+ @Nullable abstract ValueProvider<String> getQuery();
+ abstract boolean getValidate();
+ @Nullable abstract Boolean getFlattenResults();
+ @Nullable abstract Boolean getUseLegacySql();
+ @Nullable abstract BigQueryServices getBigQueryServices();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+ abstract Builder setQuery(ValueProvider<String> query);
+ abstract Builder setValidate(boolean validate);
+ abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
+ abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
+ abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+
+ abstract Read build();
+ }
+
+ private static Builder builder() {
+ return new AutoValue_BigQueryIO_Read.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl());
+ }
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"} for tables within the current project.
*/
public static Read from(String tableSpec) {
- return new Read().from(StaticValueProvider.of(tableSpec));
+ return from(StaticValueProvider.of(tableSpec));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
public static Read from(ValueProvider<String> tableSpec) {
- return new Read().from(tableSpec);
+ return builder()
+ .setJsonTableRef(
+ NestedValueProvider.of(
+ NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+ new TableRefToJson())).build();
}
/**
* Reads results received after executing the given query.
*/
public static Read fromQuery(String query) {
- return new Read().fromQuery(StaticValueProvider.of(query));
+ return fromQuery(StaticValueProvider.of(query));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
public static Read fromQuery(ValueProvider<String> query) {
- return new Read().fromQuery(query);
+ return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
}
/**
* Reads a BigQuery table specified as a {@link TableReference} object.
*/
public static Read from(TableReference table) {
- return new Read().from(table);
+ return from(StaticValueProvider.of(toTableSpec(table)));
}
- /**
- * Disable validation that the table exists or the query succeeds prior to pipeline
- * submission. Basic validation (such as ensuring that a query or table is specified) still
- * occurs.
- */
- final boolean validate;
- @Nullable final Boolean flattenResults;
- @Nullable final Boolean useLegacySql;
- @Nullable BigQueryServices bigQueryServices;
-
- @VisibleForTesting @Nullable String stepUuid;
- @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";
- private Read() {
- this(
- null /* name */,
- null /* query */,
- null /* jsonTableRef */,
- true /* validate */,
- null /* flattenResults */,
- null /* useLegacySql */,
- null /* bigQueryServices */);
- }
-
- private Read(
- String name, @Nullable ValueProvider<String> query,
- @Nullable ValueProvider<String> jsonTableRef, boolean validate,
- @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.query = query;
- this.validate = validate;
- this.flattenResults = flattenResults;
- this.useLegacySql = useLegacySql;
- this.bigQueryServices = bigQueryServices;
- }
-
/**
* Disable validation that the table exists or the query succeeds prior to pipeline
* submission. Basic validation (such as ensuring that a query or table is specified) still
* occurs.
*/
public Read withoutValidation() {
- return new Read(
- name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
- bigQueryServices);
+ return toBuilder().setValidate(false).build();
}
/**
@@ -554,9 +545,7 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Read withoutResultFlattening() {
- return new Read(
- name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
- bigQueryServices);
+ return toBuilder().setFlattenResults(false).build();
}
/**
@@ -566,15 +555,12 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Read usingStandardSql() {
- return new Read(
- name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
- bigQueryServices);
+ return toBuilder().setUseLegacySql(false).build();
}
@VisibleForTesting
Read withTestServices(BigQueryServices testServices) {
- return new Read(
- name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+ return toBuilder().setBigQueryServices(testServices).build();
}
@Override
@@ -587,7 +573,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Read needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
+ if (getBigQueryServices() == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -602,63 +588,65 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
checkState(
- table == null || query == null,
+ table == null || getQuery() == null,
"Invalid BigQueryIO.Read: table reference and query may not both be set");
checkState(
- table != null || query != null,
+ table != null || getQuery() != null,
"Invalid BigQueryIO.Read: one of table reference and query must be set");
if (table != null) {
checkState(
- flattenResults == null,
+ getFlattenResults() == null,
"Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+ " preference, which only applies to queries");
checkState(
- useLegacySql == null,
+ getUseLegacySql() == null,
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ " preference, which only applies to queries");
} else /* query != null */ {
- checkState(flattenResults != null, "flattenResults should not be null if query is set");
- checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+ checkState(
+ getFlattenResults() != null, "flattenResults should not be null if query is set");
+ checkState(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
}
// Note that a table or query check can fail if the table or dataset are created by
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
- if (validate && table != null) {
+ if (getValidate() && table != null) {
checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
// Check for source table presence for early failure notification.
DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
verifyDatasetPresence(datasetService, table.get());
verifyTablePresence(datasetService, table.get());
- } else if (validate && query != null) {
- checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+ } else if (getValidate() && getQuery() != null) {
+ checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
jobService.dryRunQuery(
bqOptions.getProject(),
new JobConfigurationQuery()
- .setQuery(query.get())
- .setFlattenResults(flattenResults)
- .setUseLegacySql(useLegacySql));
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()));
} catch (Exception e) {
throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
}
}
}
@Override
public PCollection<TableRow> expand(PBegin input) {
- stepUuid = randomUUIDString();
+ String stepUuid = randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
- jobUuid = NestedValueProvider.of(
+ ValueProvider<String> jobUuid = NestedValueProvider.of(
StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
final ValueProvider<String> jobIdToken = NestedValueProvider.of(
jobUuid, new BeamJobUuidToBigQueryJobUuid());
BoundedSource<TableRow> source;
- final BigQueryServices bqServices = getBigQueryServices();
+ final BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
@@ -671,11 +659,18 @@ public class BigQueryIO {
}
final String executingProject = bqOptions.getProject();
- if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
- source = BigQueryQuerySource.create(
- jobIdToken, query, NestedValueProvider.of(
- jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
- flattenResults, useLegacySql, extractDestinationDir, bqServices);
+ if (getQuery() != null
+ && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
+ source =
+ BigQueryQuerySource.create(
+ jobIdToken,
+ getQuery(),
+ NestedValueProvider.of(
+ jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+ getFlattenResults(),
+ getUseLegacySql(),
+ extractDestinationDir,
+ bqServices);
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
source = BigQueryTableSource.create(
@@ -726,13 +721,13 @@ public class BigQueryIO {
builder
.addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
.withLabel("Table"))
- .addIfNotNull(DisplayData.item("query", query)
+ .addIfNotNull(DisplayData.item("query", getQuery())
.withLabel("Query"))
- .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+ .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults())
.withLabel("Flatten Query Results"))
- .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+ .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql())
.withLabel("Use Legacy SQL Dialect"))
- .addIfNotDefault(DisplayData.item("validation", validate)
+ .addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"),
true);
}
@@ -769,8 +764,8 @@ public class BigQueryIO {
*/
@Nullable
public ValueProvider<TableReference> getTableProvider() {
- return jsonTableRef == null
- ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ return getJsonTableRef() == null
+ ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
/**
* Returns the table to read, or {@code null} if reading from a query instead.
@@ -780,52 +775,6 @@ public class BigQueryIO {
ValueProvider<TableReference> provider = getTableProvider();
return provider == null ? null : provider.get();
}
-
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public String getQuery() {
- return query == null ? null : query.get();
- }
-
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public ValueProvider<String> getQueryProvider() {
- return query;
- }
-
- /**
- * Returns true if table validation is enabled.
- */
- public boolean getValidate() {
- return validate;
- }
-
- /**
- * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
- */
- public Boolean getFlattenResults() {
- return flattenResults;
- }
-
- /**
- * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
- * if not applicable.
- */
- @Nullable
- public Boolean getUseLegacySql() {
- return useLegacySql;
- }
-
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
- }
}
/**
@@ -1863,7 +1812,7 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- String stepUuid = randomUUIDString();
+ final String stepUuid = randomUUIDString();
String tempLocation = options.getTempLocation();
String tempFilePrefix;
@@ -1886,7 +1835,7 @@ public class BigQueryIO {
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
- return randomUUIDString();
+ return stepUuid;
}
}))
.apply(View.<String>asSingleton());
http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f403c5a..fdaa81c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -678,14 +677,14 @@ public class BigQueryIOTest implements Serializable {
assertEquals(project, read.getTable().getProjectId());
assertEquals(dataset, read.getTable().getDatasetId());
assertEquals(table, read.getTable().getTableId());
- assertNull(read.query);
+ assertNull(read.getQuery());
assertEquals(validate, read.getValidate());
}
private void checkReadQueryObjectWithValidate(
BigQueryIO.Read read, String query, boolean validate) {
assertNull(read.getTable());
- assertEquals(query, read.getQuery());
+ assertEquals(query, read.getQuery().get());
assertEquals(validate, read.getValidate());
}
@@ -2433,20 +2432,4 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.TableRowInfoCoder.of()),
IntervalWindow.getCoder()));
}
-
- @Test
- public void testUniqueStepIdRead() {
- RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- Pipeline pipeline = TestPipeline.create(options);
- bqOptions.setTempLocation("gs://testbucket/testdir");
- BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
- options.getInputQuery()).withoutValidation();
- pipeline.apply(read1);
- BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
- options.getInputQuery()).withoutValidation();
- pipeline.apply(read2);
- assertNotEquals(read1.stepUuid, read2.stepUuid);
- assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
- }
}
[07/10] beam git commit: Simplify configuration of StreamWithDedup
Posted by tg...@apache.org.
Simplify configuration of StreamWithDedup
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1adcbaea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1adcbaea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1adcbaea
Branch: refs/heads/master
Commit: 1adcbaea799e83016ec91f7b7155c3a25804ce6c
Parents: 5c71589
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:19:51 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:32 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 73 +++++++-------------
1 file changed, 26 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1adcbaea/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d2f6ba6..e039c8c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -40,7 +40,6 @@ import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -462,7 +461,7 @@ public class BigQueryIO {
abstract boolean getValidate();
@Nullable abstract Boolean getFlattenResults();
@Nullable abstract Boolean getUseLegacySql();
- @Nullable abstract BigQueryServices getBigQueryServices();
+ abstract BigQueryServices getBigQueryServices();
abstract Builder toBuilder();
@@ -645,8 +644,6 @@ public class BigQueryIO {
jobUuid, new BeamJobUuidToBigQueryJobUuid());
BoundedSource<TableRow> source;
- final BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
@@ -670,11 +667,11 @@ public class BigQueryIO {
getFlattenResults(),
getUseLegacySql(),
extractDestinationDir,
- bqServices);
+ getBigQueryServices());
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
source = BigQueryTableSource.create(
- jobIdToken, inputTable, extractDestinationDir, bqServices,
+ jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
StaticValueProvider.of(executingProject));
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
@@ -687,7 +684,7 @@ public class BigQueryIO {
.setProjectId(executingProject)
.setJobId(getExtractJobId(jobIdToken));
- Job extractJob = bqServices.getJobService(bqOptions)
+ Job extractJob = getBigQueryServices().getJobService(bqOptions)
.getJob(jobRef);
Collection<String> extractFiles = null;
@@ -1390,7 +1387,7 @@ public class BigQueryIO {
@Nullable abstract String getTableDescription();
/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();
- @Nullable abstract BigQueryServices getBigQueryServices();
+ abstract BigQueryServices getBigQueryServices();
abstract Builder toBuilder();
@@ -1650,12 +1647,10 @@ public class BigQueryIO {
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
// The user specified a table.
- BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
if (getJsonTableRef() != null && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = bqServices.getDatasetService(options);
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1693,7 +1688,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bqServices == null) {
+ if (getBigQueryServices() == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -1711,19 +1706,11 @@ public class BigQueryIO {
public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices =
- MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
- return input.apply(
- new StreamWithDeDup(getTable(), getTableRefFunction(),
- getJsonSchema() == null ? null : NestedValueProvider.of(
- getJsonSchema(), new JsonSchemaToTableSchema()),
- getCreateDisposition(),
- getTableDescription(),
- bqServices));
+ return input.apply(new StreamWithDeDup(this));
}
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -1786,7 +1773,7 @@ public class BigQueryIO {
.apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
.apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
false,
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
@@ -1800,7 +1787,7 @@ public class BigQueryIO {
.apply("TempTablesView", View.<String>asIterable());
singleton.apply(ParDo
.of(new WriteRename(
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
NestedValueProvider.of(table, new TableRefToJson()),
getWriteDisposition(),
@@ -1814,7 +1801,7 @@ public class BigQueryIO {
.apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
.apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
true,
- bqServices,
+ getBigQueryServices(),
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
@@ -2740,25 +2727,11 @@ public class BigQueryIO {
* it leverages BigQuery best effort de-dup mechanism.
*/
private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
- @Nullable private final transient ValueProvider<TableReference> tableReference;
- @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
- @Nullable private final transient ValueProvider<TableSchema> tableSchema;
- private final Write.CreateDisposition createDisposition;
- private final BigQueryServices bqServices;
- @Nullable private final String tableDescription;
+ private final Write write;
/** Constructor. */
- StreamWithDeDup(ValueProvider<TableReference> tableReference,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<TableSchema> tableSchema,
- Write.CreateDisposition createDisposition,
- @Nullable String tableDescription, BigQueryServices bqServices) {
- this.tableReference = tableReference;
- this.tableRefFunction = tableRefFunction;
- this.tableSchema = tableSchema;
- this.createDisposition = createDisposition;
- this.bqServices = checkNotNull(bqServices, "bqServices");
- this.tableDescription = tableDescription;
+ StreamWithDeDup(Write write) {
+ this.write = write;
}
@Override
@@ -2780,20 +2753,26 @@ public class BigQueryIO {
PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged = input.apply(ParDo.of(
new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class),
- tableReference, tableRefFunction)));
+ write.getTable(), write.getTableRefFunction())));
// To prevent having the same TableRow processed more than once with regenerated
// different unique ids, this implementation relies on "checkpointing", which is
// achieved as a side effect of having StreamingWriteFn immediately follow a GBK,
// performed by Reshuffle.
+ NestedValueProvider<TableSchema, String> schema =
+ write.getJsonSchema() == null
+ ? null
+ : NestedValueProvider.of(write.getJsonSchema(), new JsonSchemaToTableSchema());
tagged
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
.apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
- .apply(ParDo.of(new StreamingWriteFn(
- tableSchema,
- createDisposition,
- tableDescription,
- bqServices)));
+ .apply(
+ ParDo.of(
+ new StreamingWriteFn(
+ schema,
+ write.getCreateDisposition(),
+ write.getTableDescription(),
+ write.getBigQueryServices())));
// Note that the implementation to return PDone here breaks the
// implicit assumption about the job execution order. If a user
[06/10] beam git commit: Use AutoValue for BigQueryIO.Write
Posted by tg...@apache.org.
Use AutoValue for BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c715896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c715896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c715896
Branch: refs/heads/master
Commit: 5c715896e683f7bec9f150ea17c78d1dae00ee4c
Parents: 32cba32
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:14:39 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:30 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 314 ++++++-------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 8 +-
2 files changed, 108 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e5db60e..d2f6ba6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -471,9 +471,9 @@ public class BigQueryIO {
abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
abstract Builder setQuery(ValueProvider<String> query);
abstract Builder setValidate(boolean validate);
- abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
- abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
- abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+ abstract Builder setFlattenResults(Boolean flattenResults);
+ abstract Builder setUseLegacySql(Boolean useLegacySql);
+ abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
abstract Read build();
}
@@ -1364,10 +1364,13 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ @VisibleForTesting
// Maximum number of files in a single partition.
static final int MAX_NUM_FILES = 10000;
+ @VisibleForTesting
// Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
static final long MAX_SIZE_BYTES = 11 * (1L << 40);
@@ -1378,28 +1381,41 @@ public class BigQueryIO {
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
- @Nullable private final ValueProvider<String> jsonTableRef;
-
- @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
- // Table schema. The schema is required only if the table does not exist.
- @Nullable private final ValueProvider<String> jsonSchema;
-
- // Options for creating the table. Valid values are CREATE_IF_NEEDED and
- // CREATE_NEVER.
- final CreateDisposition createDisposition;
+ @Nullable abstract ValueProvider<String> getJsonTableRef();
+ @Nullable abstract SerializableFunction<BoundedWindow, TableReference> getTableRefFunction();
+ /** Table schema. The schema is required only if the table does not exist. */
+ @Nullable abstract ValueProvider<String> getJsonSchema();
+ abstract CreateDisposition getCreateDisposition();
+ abstract WriteDisposition getWriteDisposition();
+ @Nullable abstract String getTableDescription();
+ /** An option to indicate if table validation is desired. Default is true. */
+ abstract boolean getValidate();
+ @Nullable abstract BigQueryServices getBigQueryServices();
- // Options for writing to the table. Valid values are WRITE_TRUNCATE,
- // WRITE_APPEND and WRITE_EMPTY.
- final WriteDisposition writeDisposition;
+ abstract Builder toBuilder();
- @Nullable
- final String tableDescription;
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+ abstract Builder setTableRefFunction(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction);
+ abstract Builder setJsonSchema(ValueProvider<String> jsonSchema);
+ abstract Builder setCreateDisposition(CreateDisposition createDisposition);
+ abstract Builder setWriteDisposition(WriteDisposition writeDisposition);
+ abstract Builder setTableDescription(String tableDescription);
+ abstract Builder setValidate(boolean validate);
+ abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
- // An option to indicate if table validation is desired. Default is true.
- final boolean validate;
+ abstract Write build();
+ }
- @Nullable private BigQueryServices bigQueryServices;
+ private static Builder builder() {
+ return new AutoValue_BigQueryIO_Write.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
+ }
/**
* An enumeration type for the BigQuery create disposition strings.
@@ -1474,17 +1490,22 @@ public class BigQueryIO {
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
*/
public static Write to(String tableSpec) {
- return new Write().withTableSpec(tableSpec);
+ return to(StaticValueProvider.of(tableSpec));
}
/** Creates a write transformation for the given table. */
- public static Write to(ValueProvider<String> tableSpec) {
- return new Write().withTableSpec(tableSpec);
+ public static Write to(TableReference table) {
+ return to(StaticValueProvider.of(toTableSpec(table)));
}
/** Creates a write transformation for the given table. */
- public static Write to(TableReference table) {
- return new Write().withTableRef(table);
+ public static Write to(ValueProvider<String> tableSpec) {
+ return builder()
+ .setJsonTableRef(
+ NestedValueProvider.of(
+ NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+ new TableRefToJson()))
+ .build();
}
/**
@@ -1499,7 +1520,7 @@ public class BigQueryIO {
* always return the same table specification.
*/
public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return new Write().withTableSpec(tableSpecFunction);
+ return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
}
/**
@@ -1511,7 +1532,7 @@ public class BigQueryIO {
*/
private static Write toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Write().withTableRef(tableRefFunction);
+ return builder().setTableRefFunction(tableRefFunction).build();
}
private static class TranslateTableSpecFunction implements
@@ -1528,109 +1549,6 @@ public class BigQueryIO {
}
}
- private Write() {
- this(
- null /* name */,
- null /* jsonTableRef */,
- null /* tableRefFunction */,
- null /* jsonSchema */,
- CreateDisposition.CREATE_IF_NEEDED,
- WriteDisposition.WRITE_EMPTY,
- null /* tableDescription */,
- true /* validate */,
- null /* bigQueryServices */);
- }
-
- private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<String> jsonSchema,
- CreateDisposition createDisposition,
- WriteDisposition writeDisposition,
- @Nullable String tableDescription,
- boolean validate,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.tableRefFunction = tableRefFunction;
- this.jsonSchema = jsonSchema;
- this.createDisposition = checkNotNull(createDisposition, "createDisposition");
- this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
- this.tableDescription = tableDescription;
- this.validate = validate;
- this.bigQueryServices = bigQueryServices;
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- private Write withTableSpec(String tableSpec) {
- return withTableRef(NestedValueProvider.of(
- StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableRef(TableReference table) {
- return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Write withTableSpec(ValueProvider<String> tableSpec) {
- return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- private Write withTableRef(ValueProvider<TableReference> table) {
- return new Write(name,
- NestedValueProvider.of(table, new TableRefToJson()),
- tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
-
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
- * should always return the same table specification.
- */
- private Write withTableSpec(
- SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
- }
-
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
- * always return the same table reference.
- */
- private Write withTableRef(
- SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
-
/**
* Returns a copy of this write transformation, but using the specified schema for rows
* to be written.
@@ -1638,18 +1556,18 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withSchema(TableSchema schema) {
- return new Write(name, jsonTableRef, tableRefFunction,
- StaticValueProvider.of(toJsonString(schema)),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder()
+ .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+ .build();
}
/**
* Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
*/
public Write withSchema(ValueProvider<TableSchema> schema) {
- return new Write(name, jsonTableRef, tableRefFunction,
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder()
+ .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()))
+ .build();
}
/**
@@ -1658,8 +1576,7 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withCreateDisposition(CreateDisposition createDisposition) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder().setCreateDisposition(createDisposition).build();
}
/**
@@ -1668,8 +1585,7 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withWriteDisposition(WriteDisposition writeDisposition) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder().setWriteDisposition(writeDisposition).build();
}
/**
@@ -1677,9 +1593,8 @@ public class BigQueryIO {
*
* <p>Does not modify this object.
*/
- public Write withTableDescription(@Nullable String tableDescription) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ public Write withTableDescription(String tableDescription) {
+ return toBuilder().setTableDescription(tableDescription).build();
}
/**
@@ -1688,14 +1603,12 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withoutValidation() {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, false, bigQueryServices);
+ return toBuilder().setValidate(false).build();
}
@VisibleForTesting
Write withTestServices(BigQueryServices testServices) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, testServices);
+ return toBuilder().setBigQueryServices(testServices).build();
}
private static void verifyTableNotExistOrEmpty(
@@ -1724,23 +1637,25 @@ public class BigQueryIO {
// Exactly one of the table and table reference can be configured.
checkState(
- jsonTableRef != null || tableRefFunction != null,
+ getJsonTableRef() != null || getTableRefFunction() != null,
"must set the table reference of a BigQueryIO.Write transform");
checkState(
- jsonTableRef == null || tableRefFunction == null,
+ getJsonTableRef() == null || getTableRefFunction() == null,
"Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ " transform");
// Require a schema if creating one or more tables.
checkArgument(
- createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
// The user specified a table.
- if (jsonTableRef != null && validate) {
+ BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
+ if (getJsonTableRef() != null && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ DatasetService datasetService = bqServices.getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1754,22 +1669,22 @@ public class BigQueryIO {
}
}
- if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
// We will use BigQuery's streaming write API -- validate supported dispositions.
- if (tableRefFunction != null) {
+ if (getTableRefFunction() != null) {
checkArgument(
- createDisposition != CreateDisposition.CREATE_NEVER,
+ getCreateDisposition() != CreateDisposition.CREATE_NEVER,
"CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+ " function.");
}
- if (jsonSchema == null) {
+ if (getJsonSchema() == null) {
checkArgument(
- createDisposition == CreateDisposition.CREATE_NEVER,
+ getCreateDisposition() == CreateDisposition.CREATE_NEVER,
"CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
}
checkArgument(
- writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ " when using a tablespec function.");
} else {
@@ -1778,7 +1693,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
+ if (bqServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -1796,17 +1711,18 @@ public class BigQueryIO {
public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices = getBigQueryServices();
+ BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
- if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+ if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
return input.apply(
- new StreamWithDeDup(getTable(), tableRefFunction,
- jsonSchema == null ? null : NestedValueProvider.of(
- jsonSchema, new JsonSchemaToTableSchema()),
- createDisposition,
- tableDescription,
+ new StreamWithDeDup(getTable(), getTableRefFunction(),
+ getJsonSchema() == null ? null : NestedValueProvider.of(
+ getJsonSchema(), new JsonSchemaToTableSchema()),
+ getCreateDisposition(),
+ getTableDescription(),
bqServices));
}
@@ -1874,10 +1790,10 @@ public class BigQueryIO {
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
+ getJsonSchema(),
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
- tableDescription))
+ getTableDescription()))
.withSideInputs(jobIdTokenView));
PCollectionView<Iterable<String>> tempTablesView = tempTables
@@ -1887,10 +1803,10 @@ public class BigQueryIO {
bqServices,
jobIdTokenView,
NestedValueProvider.of(table, new TableRefToJson()),
- writeDisposition,
- createDisposition,
+ getWriteDisposition(),
+ getCreateDisposition(),
tempTablesView,
- tableDescription))
+ getTableDescription()))
.withSideInputs(tempTablesView, jobIdTokenView));
// Write single partition to final table
@@ -1902,10 +1818,10 @@ public class BigQueryIO {
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- writeDisposition,
- createDisposition,
- tableDescription))
+ getJsonSchema(),
+ getWriteDisposition(),
+ getCreateDisposition(),
+ getTableDescription()))
.withSideInputs(jobIdTokenView));
return PDone.in(input.getPipeline());
@@ -1969,41 +1885,31 @@ public class BigQueryIO {
super.populateDisplayData(builder);
builder
- .addIfNotNull(DisplayData.item("table", jsonTableRef)
+ .addIfNotNull(DisplayData.item("table", getJsonTableRef())
.withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .addIfNotNull(DisplayData.item("schema", getJsonSchema())
.withLabel("Table Schema"));
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ if (getTableRefFunction() != null) {
+ builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
.withLabel("Table Reference Function"));
}
builder
- .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .add(DisplayData.item("createDisposition", getCreateDisposition().toString())
.withLabel("Table CreateDisposition"))
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .add(DisplayData.item("writeDisposition", getWriteDisposition().toString())
.withLabel("Table WriteDisposition"))
- .addIfNotDefault(DisplayData.item("validation", validate)
+ .addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+ .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
.withLabel("Table Description"));
}
- /** Returns the create disposition. */
- public CreateDisposition getCreateDisposition() {
- return createDisposition;
- }
-
- /** Returns the write disposition. */
- public WriteDisposition getWriteDisposition() {
- return writeDisposition;
- }
-
/** Returns the table schema. */
public TableSchema getSchema() {
return fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
}
/**
@@ -2036,20 +1942,8 @@ public class BigQueryIO {
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
- return jsonTableRef == null ? null :
- NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
- }
-
- /** Returns {@code true} if table validation is enabled. */
- public boolean getValidate() {
- return validate;
- }
-
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
+ return getJsonTableRef() == null ? null :
+ NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
static class TableRowWriter {
http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fdaa81c..888d9c1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -712,10 +712,10 @@ public class BigQueryIOTest implements Serializable {
assertEquals(dataset, write.getTable().get().getDatasetId());
assertEquals(table, write.getTable().get().getTableId());
assertEquals(schema, write.getSchema());
- assertEquals(createDisposition, write.createDisposition);
- assertEquals(writeDisposition, write.writeDisposition);
- assertEquals(tableDescription, write.tableDescription);
- assertEquals(validate, write.validate);
+ assertEquals(createDisposition, write.getCreateDisposition());
+ assertEquals(writeDisposition, write.getWriteDisposition());
+ assertEquals(tableDescription, write.getTableDescription());
+ assertEquals(validate, write.getValidate());
}
@Before