You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:02 UTC
[08/10] beam git commit: Replace BigQueryIO.Read.from() with
BigQueryIO.read().from()
Replace BigQueryIO.Read.from() with BigQueryIO.read().from()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a252a77
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a252a77
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a252a77
Branch: refs/heads/master
Commit: 1a252a771127febe551fda5d499c7ecb3b95cf23
Parents: 1adcbae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Mar 13 16:15:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:36 2017 -0700
----------------------------------------------------------------------
.../examples/cookbook/BigQueryTornadoes.java | 2 +-
.../cookbook/CombinePerKeyExamples.java | 2 +-
.../beam/examples/cookbook/FilterExamples.java | 2 +-
.../beam/examples/cookbook/JoinExamples.java | 4 +--
.../examples/cookbook/MaxPerKeyExamples.java | 2 +-
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 36 +++++++++++++-------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 36 ++++++++++----------
8 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 079674a..d3c9167 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -156,7 +156,7 @@ public class BigQueryTornadoes {
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new CountTornadoes())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index 37f9d79..fc54b13 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -200,7 +200,7 @@ public class CombinePerKeyExamples {
fields.add(new TableFieldSchema().setName("all_plays").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new PlaysForWord())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index fb6b507..714a8f2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -238,7 +238,7 @@ public class FilterExamples {
TableSchema schema = buildWeatherSchemaProjection();
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(ParDo.of(new ProjectionFn()))
.apply(new BelowGlobalMean(options.getMonthFilter()))
.apply(BigQueryIO.Write
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 7cf0942..05a3ad3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -166,8 +166,8 @@ public class JoinExamples {
Pipeline p = Pipeline.create(options);
// the following two 'applys' create multiple inputs to our pipeline, one for each
// of our two input sources.
- PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE));
- PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES));
+ PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
+ PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
formattedResults.apply(TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index eabc42b..7e7bc72 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -149,7 +149,7 @@ public class MaxPerKeyExamples {
fields.add(new TableFieldSchema().setName("max_mean_temp").setType("FLOAT"));
TableSchema schema = new TableSchema().setFields(fields);
- p.apply(BigQueryIO.Read.from(options.getInput()))
+ p.apply(BigQueryIO.read().from(options.getInput()))
.apply(new MaxMeanTemp())
.apply(BigQueryIO.Write
.to(options.getOutput())
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c4ff158..c65d7dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -24,7 +24,7 @@
* from existing storage:
* <pre>{@code
* PCollection<TableRow> inputData = pipeline.apply(
- * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
* and {@code Write} transforms that persist PCollections to external storage:
* <pre> {@code
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e039c8c..dfb7ea6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -166,7 +166,7 @@ import org.slf4j.LoggerFactory;
* This produces a {@link PCollection} of {@link TableRow TableRows} as output:
* <pre>{@code
* PCollection<TableRow> weatherData = pipeline.apply(
- * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ * BigQueryIO.read().from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
*
* <p>See {@link TableRow} for more information on the {@link TableRow} object.
@@ -177,7 +177,7 @@ import org.slf4j.LoggerFactory;
*
* <pre>{@code
* PCollection<TableRow> meanTemperatureData = pipeline.apply(
- * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
+ * BigQueryIO.read().fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
* }</pre>
*
* <p>When creating a BigQuery input transform, users should provide either a query or a table.
@@ -454,6 +454,14 @@ public class BigQueryIO {
* }
* }}</pre>
*/
+ public static Read read() {
+ return new AutoValue_BigQueryIO_Read.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .build();
+ }
+
+ /** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
@Nullable abstract ValueProvider<String> getJsonTableRef();
@@ -477,25 +485,26 @@ public class BigQueryIO {
abstract Read build();
}
- private static Builder builder() {
- return new AutoValue_BigQueryIO_Read.Builder()
- .setValidate(true)
- .setBigQueryServices(new BigQueryServicesImpl());
+ /** Ensures that methods of the from() / fromQuery() family are called at most once. */
+ private void ensureFromNotCalledYet() {
+ checkState(
+ getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");
}
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"} for tables within the current project.
*/
- public static Read from(String tableSpec) {
+ public Read from(String tableSpec) {
return from(StaticValueProvider.of(tableSpec));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Read from(ValueProvider<String> tableSpec) {
- return builder()
+ public Read from(ValueProvider<String> tableSpec) {
+ ensureFromNotCalledYet();
+ return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
@@ -505,21 +514,22 @@ public class BigQueryIO {
/**
* Reads results received after executing the given query.
*/
- public static Read fromQuery(String query) {
+ public Read fromQuery(String query) {
return fromQuery(StaticValueProvider.of(query));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
- public static Read fromQuery(ValueProvider<String> query) {
- return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
+ public Read fromQuery(ValueProvider<String> query) {
+ ensureFromNotCalledYet();
+ return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
}
/**
* Reads a BigQuery table specified as a {@link TableReference} object.
*/
- public static Read from(TableReference table) {
+ public Read from(TableReference table) {
return from(StaticValueProvider.of(toTableSpec(table)));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1a252a77/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 888d9c1..f6a7fb4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -727,13 +727,13 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildTableBasedSource() {
- BigQueryIO.Read read = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+ BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable");
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedSource() {
- BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+ BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query");
checkReadQueryObject(read, "foo_query");
}
@@ -742,7 +742,7 @@ public class BigQueryIOTest implements Serializable {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read =
- BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
+ BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation();
checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
}
@@ -751,14 +751,14 @@ public class BigQueryIOTest implements Serializable {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read =
- BigQueryIO.Read.fromQuery("some_query").withoutValidation();
+ BigQueryIO.read().fromQuery("some_query").withoutValidation();
checkReadQueryObjectWithValidate(read, "some_query", false);
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
BigQueryIO.Read read =
- BigQueryIO.Read.from("somedataset.sometable");
+ BigQueryIO.read().from("somedataset.sometable");
checkReadTableObject(read, null, "somedataset", "sometable");
}
@@ -768,7 +768,7 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Read read = BigQueryIO.Read.from(table);
+ BigQueryIO.Read read = BigQueryIO.read().from(table);
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@@ -800,7 +800,7 @@ public class BigQueryIOTest implements Serializable {
thrown.expect(RuntimeException.class);
// Message will be one of following depending on the execution environment.
thrown.expectMessage(Matchers.containsString("Unsupported"));
- p.apply(BigQueryIO.Read.from(tableRef)
+ p.apply(BigQueryIO.read().from(tableRef)
.withTestServices(fakeBqServices));
}
@@ -817,7 +817,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply("ReadMyTable",
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.withoutResultFlattening());
p.run();
@@ -836,7 +836,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply(
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.withoutValidation()
.withoutResultFlattening());
@@ -856,7 +856,7 @@ public class BigQueryIOTest implements Serializable {
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
+ " which only applies to queries");
p.apply(
- BigQueryIO.Read
+ BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.usingStandardSql());
p.run();
@@ -929,7 +929,7 @@ public class BigQueryIOTest implements Serializable {
Pipeline p = TestPipeline.create(bqOptions);
PCollection<String> output = p
- .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
+ .apply(BigQueryIO.read().from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
.apply(ParDo.of(new DoFn<TableRow, String>() {
@@ -1260,7 +1260,7 @@ public class BigQueryIOTest implements Serializable {
public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.from(tableSpec)
.withoutResultFlattening()
.usingStandardSql()
@@ -1276,7 +1276,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildSourceDisplayDataQuery() {
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.fromQuery("myQuery")
.withoutResultFlattening()
.usingStandardSql()
@@ -1295,7 +1295,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.from("project:dataset.tableId")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -1312,7 +1312,7 @@ public class BigQueryIOTest implements Serializable {
@Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- BigQueryIO.Read read = BigQueryIO.Read
+ BigQueryIO.Read read = BigQueryIO.read()
.fromQuery("foobar")
.withTestServices(new FakeBigQueryServices()
.withDatasetService(mockDatasetService)
@@ -1674,7 +1674,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBigQueryIOGetName() {
- assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
+ assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
}
@@ -2317,7 +2317,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read read = BigQueryIO.Read.from(
+ BigQueryIO.Read read = BigQueryIO.read().from(
options.getInputTable()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.
@@ -2330,7 +2330,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
+ BigQueryIO.Read read = BigQueryIO.read().fromQuery(
options.getInputQuery()).withoutValidation();
pipeline.apply(read);
// Test that this doesn't throw.