You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:59 UTC
[05/10] beam git commit: Use AutoValue for BigQueryIO.Read
Use AutoValue for BigQueryIO.Read
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32cba321
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32cba321
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32cba321
Branch: refs/heads/master
Commit: 32cba321c04c5e3fb18856c84ea10b3513264dd5
Parents: d6b3e11
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:51:04 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:27 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 215 +++++++------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 21 +-
2 files changed, 84 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 90d7f67..e5db60e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -36,9 +36,11 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -453,97 +455,86 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
- @Nullable final ValueProvider<String> jsonTableRef;
- @Nullable final ValueProvider<String> query;
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+ @Nullable abstract ValueProvider<String> getJsonTableRef();
+ @Nullable abstract ValueProvider<String> getQuery();
+ abstract boolean getValidate();
+ @Nullable abstract Boolean getFlattenResults();
+ @Nullable abstract Boolean getUseLegacySql();
+ @Nullable abstract BigQueryServices getBigQueryServices();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+ abstract Builder setQuery(ValueProvider<String> query);
+ abstract Builder setValidate(boolean validate);
+ abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
+ abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
+ abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+
+ abstract Read build();
+ }
+
+ private static Builder builder() {
+ return new AutoValue_BigQueryIO_Read.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl());
+ }
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"} for tables within the current project.
*/
public static Read from(String tableSpec) {
- return new Read().from(StaticValueProvider.of(tableSpec));
+ return from(StaticValueProvider.of(tableSpec));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
public static Read from(ValueProvider<String> tableSpec) {
- return new Read().from(tableSpec);
+ return builder()
+ .setJsonTableRef(
+ NestedValueProvider.of(
+ NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+ new TableRefToJson())).build();
}
/**
* Reads results received after executing the given query.
*/
public static Read fromQuery(String query) {
- return new Read().fromQuery(StaticValueProvider.of(query));
+ return fromQuery(StaticValueProvider.of(query));
}
/**
* Same as {@code from(String)}, but with a {@link ValueProvider}.
*/
public static Read fromQuery(ValueProvider<String> query) {
- return new Read().fromQuery(query);
+ return builder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
}
/**
* Reads a BigQuery table specified as a {@link TableReference} object.
*/
public static Read from(TableReference table) {
- return new Read().from(table);
+ return from(StaticValueProvider.of(toTableSpec(table)));
}
- /**
- * Disable validation that the table exists or the query succeeds prior to pipeline
- * submission. Basic validation (such as ensuring that a query or table is specified) still
- * occurs.
- */
- final boolean validate;
- @Nullable final Boolean flattenResults;
- @Nullable final Boolean useLegacySql;
- @Nullable BigQueryServices bigQueryServices;
-
- @VisibleForTesting @Nullable String stepUuid;
- @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";
- private Read() {
- this(
- null /* name */,
- null /* query */,
- null /* jsonTableRef */,
- true /* validate */,
- null /* flattenResults */,
- null /* useLegacySql */,
- null /* bigQueryServices */);
- }
-
- private Read(
- String name, @Nullable ValueProvider<String> query,
- @Nullable ValueProvider<String> jsonTableRef, boolean validate,
- @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.query = query;
- this.validate = validate;
- this.flattenResults = flattenResults;
- this.useLegacySql = useLegacySql;
- this.bigQueryServices = bigQueryServices;
- }
-
/**
* Disable validation that the table exists or the query succeeds prior to pipeline
* submission. Basic validation (such as ensuring that a query or table is specified) still
* occurs.
*/
public Read withoutValidation() {
- return new Read(
- name, query, jsonTableRef, false /* validate */, flattenResults, useLegacySql,
- bigQueryServices);
+ return toBuilder().setValidate(false).build();
}
/**
@@ -554,9 +545,7 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Read withoutResultFlattening() {
- return new Read(
- name, query, jsonTableRef, validate, false /* flattenResults */, useLegacySql,
- bigQueryServices);
+ return toBuilder().setFlattenResults(false).build();
}
/**
@@ -566,15 +555,12 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Read usingStandardSql() {
- return new Read(
- name, query, jsonTableRef, validate, flattenResults, false /* useLegacySql */,
- bigQueryServices);
+ return toBuilder().setUseLegacySql(false).build();
}
@VisibleForTesting
Read withTestServices(BigQueryServices testServices) {
- return new Read(
- name, query, jsonTableRef, validate, flattenResults, useLegacySql, testServices);
+ return toBuilder().setBigQueryServices(testServices).build();
}
@Override
@@ -587,7 +573,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Read needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
+ if (getBigQueryServices() == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -602,63 +588,65 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
checkState(
- table == null || query == null,
+ table == null || getQuery() == null,
"Invalid BigQueryIO.Read: table reference and query may not both be set");
checkState(
- table != null || query != null,
+ table != null || getQuery() != null,
"Invalid BigQueryIO.Read: one of table reference and query must be set");
if (table != null) {
checkState(
- flattenResults == null,
+ getFlattenResults() == null,
"Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+ " preference, which only applies to queries");
checkState(
- useLegacySql == null,
+ getUseLegacySql() == null,
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ " preference, which only applies to queries");
} else /* query != null */ {
- checkState(flattenResults != null, "flattenResults should not be null if query is set");
- checkState(useLegacySql != null, "useLegacySql should not be null if query is set");
+ checkState(
+ getFlattenResults() != null, "flattenResults should not be null if query is set");
+ checkState(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
}
// Note that a table or query check can fail if the table or dataset are created by
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
- if (validate && table != null) {
+ if (getValidate() && table != null) {
checkState(table.isAccessible(), "Cannot call validate if table is dynamically set.");
// Check for source table presence for early failure notification.
DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions);
verifyDatasetPresence(datasetService, table.get());
verifyTablePresence(datasetService, table.get());
- } else if (validate && query != null) {
- checkState(query.isAccessible(), "Cannot call validate if query is dynamically set.");
+ } else if (getValidate() && getQuery() != null) {
+ checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
jobService.dryRunQuery(
bqOptions.getProject(),
new JobConfigurationQuery()
- .setQuery(query.get())
- .setFlattenResults(flattenResults)
- .setUseLegacySql(useLegacySql));
+ .setQuery(getQuery().get())
+ .setFlattenResults(getFlattenResults())
+ .setUseLegacySql(getUseLegacySql()));
} catch (Exception e) {
throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
}
}
}
@Override
public PCollection<TableRow> expand(PBegin input) {
- stepUuid = randomUUIDString();
+ String stepUuid = randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
- jobUuid = NestedValueProvider.of(
+ ValueProvider<String> jobUuid = NestedValueProvider.of(
StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
final ValueProvider<String> jobIdToken = NestedValueProvider.of(
jobUuid, new BeamJobUuidToBigQueryJobUuid());
BoundedSource<TableRow> source;
- final BigQueryServices bqServices = getBigQueryServices();
+ final BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
final String extractDestinationDir;
String tempLocation = bqOptions.getTempLocation();
@@ -671,11 +659,18 @@ public class BigQueryIO {
}
final String executingProject = bqOptions.getProject();
- if (query != null && (!query.isAccessible() || !Strings.isNullOrEmpty(query.get()))) {
- source = BigQueryQuerySource.create(
- jobIdToken, query, NestedValueProvider.of(
- jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
- flattenResults, useLegacySql, extractDestinationDir, bqServices);
+ if (getQuery() != null
+ && (!getQuery().isAccessible() || !Strings.isNullOrEmpty(getQuery().get()))) {
+ source =
+ BigQueryQuerySource.create(
+ jobIdToken,
+ getQuery(),
+ NestedValueProvider.of(
+ jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+ getFlattenResults(),
+ getUseLegacySql(),
+ extractDestinationDir,
+ bqServices);
} else {
ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
source = BigQueryTableSource.create(
@@ -726,13 +721,13 @@ public class BigQueryIO {
builder
.addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
.withLabel("Table"))
- .addIfNotNull(DisplayData.item("query", query)
+ .addIfNotNull(DisplayData.item("query", getQuery())
.withLabel("Query"))
- .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+ .addIfNotNull(DisplayData.item("flattenResults", getFlattenResults())
.withLabel("Flatten Query Results"))
- .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+ .addIfNotNull(DisplayData.item("useLegacySql", getUseLegacySql())
.withLabel("Use Legacy SQL Dialect"))
- .addIfNotDefault(DisplayData.item("validation", validate)
+ .addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"),
true);
}
@@ -769,8 +764,8 @@ public class BigQueryIO {
*/
@Nullable
public ValueProvider<TableReference> getTableProvider() {
- return jsonTableRef == null
- ? null : NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ return getJsonTableRef() == null
+ ? null : NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
/**
* Returns the table to read, or {@code null} if reading from a query instead.
@@ -780,52 +775,6 @@ public class BigQueryIO {
ValueProvider<TableReference> provider = getTableProvider();
return provider == null ? null : provider.get();
}
-
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public String getQuery() {
- return query == null ? null : query.get();
- }
-
- /**
- * Returns the query to be read, or {@code null} if reading from a table instead.
- */
- @Nullable
- public ValueProvider<String> getQueryProvider() {
- return query;
- }
-
- /**
- * Returns true if table validation is enabled.
- */
- public boolean getValidate() {
- return validate;
- }
-
- /**
- * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
- */
- public Boolean getFlattenResults() {
- return flattenResults;
- }
-
- /**
- * Returns true (false) if the query will (will not) use BigQuery's legacy SQL mode, or null
- * if not applicable.
- */
- @Nullable
- public Boolean getUseLegacySql() {
- return useLegacySql;
- }
-
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
- }
}
/**
@@ -1863,7 +1812,7 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- String stepUuid = randomUUIDString();
+ final String stepUuid = randomUUIDString();
String tempLocation = options.getTempLocation();
String tempFilePrefix;
@@ -1886,7 +1835,7 @@ public class BigQueryIO {
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
- return randomUUIDString();
+ return stepUuid;
}
}))
.apply(View.<String>asSingleton());
http://git-wip-us.apache.org/repos/asf/beam/blob/32cba321/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index f403c5a..fdaa81c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
@@ -678,14 +677,14 @@ public class BigQueryIOTest implements Serializable {
assertEquals(project, read.getTable().getProjectId());
assertEquals(dataset, read.getTable().getDatasetId());
assertEquals(table, read.getTable().getTableId());
- assertNull(read.query);
+ assertNull(read.getQuery());
assertEquals(validate, read.getValidate());
}
private void checkReadQueryObjectWithValidate(
BigQueryIO.Read read, String query, boolean validate) {
assertNull(read.getTable());
- assertEquals(query, read.getQuery());
+ assertEquals(query, read.getQuery().get());
assertEquals(validate, read.getValidate());
}
@@ -2433,20 +2432,4 @@ public class BigQueryIOTest implements Serializable {
BigQueryIO.TableRowInfoCoder.of()),
IntervalWindow.getCoder()));
}
-
- @Test
- public void testUniqueStepIdRead() {
- RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- Pipeline pipeline = TestPipeline.create(options);
- bqOptions.setTempLocation("gs://testbucket/testdir");
- BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
- options.getInputQuery()).withoutValidation();
- pipeline.apply(read1);
- BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
- options.getInputQuery()).withoutValidation();
- pipeline.apply(read2);
- assertNotEquals(read1.stepUuid, read2.stepUuid);
- assertNotEquals(read1.jobUuid.get(), read2.jobUuid.get());
- }
}