You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:55:00 UTC
[06/10] beam git commit: Use AutoValue for BigQueryIO.Write
Use AutoValue for BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c715896
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c715896
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c715896
Branch: refs/heads/master
Commit: 5c715896e683f7bec9f150ea17c78d1dae00ee4c
Parents: 32cba32
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 18:14:39 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:30 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 314 ++++++-------------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 8 +-
2 files changed, 108 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e5db60e..d2f6ba6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -471,9 +471,9 @@ public class BigQueryIO {
abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
abstract Builder setQuery(ValueProvider<String> query);
abstract Builder setValidate(boolean validate);
- abstract Builder setFlattenResults(@Nullable Boolean flattenResults);
- abstract Builder setUseLegacySql(@Nullable Boolean useLegacySql);
- abstract Builder setBigQueryServices(@Nullable BigQueryServices bigQueryServices);
+ abstract Builder setFlattenResults(Boolean flattenResults);
+ abstract Builder setUseLegacySql(Boolean useLegacySql);
+ abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
abstract Read build();
}
@@ -1364,10 +1364,13 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ @VisibleForTesting
// Maximum number of files in a single partition.
static final int MAX_NUM_FILES = 10000;
+ @VisibleForTesting
// Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
static final long MAX_SIZE_BYTES = 11 * (1L << 40);
@@ -1378,28 +1381,41 @@ public class BigQueryIO {
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
- @Nullable private final ValueProvider<String> jsonTableRef;
-
- @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
- // Table schema. The schema is required only if the table does not exist.
- @Nullable private final ValueProvider<String> jsonSchema;
-
- // Options for creating the table. Valid values are CREATE_IF_NEEDED and
- // CREATE_NEVER.
- final CreateDisposition createDisposition;
+ @Nullable abstract ValueProvider<String> getJsonTableRef();
+ @Nullable abstract SerializableFunction<BoundedWindow, TableReference> getTableRefFunction();
+ /** Table schema. The schema is required only if the table does not exist. */
+ @Nullable abstract ValueProvider<String> getJsonSchema();
+ abstract CreateDisposition getCreateDisposition();
+ abstract WriteDisposition getWriteDisposition();
+ @Nullable abstract String getTableDescription();
+ /** An option to indicate if table validation is desired. Default is true. */
+ abstract boolean getValidate();
+ @Nullable abstract BigQueryServices getBigQueryServices();
- // Options for writing to the table. Valid values are WRITE_TRUNCATE,
- // WRITE_APPEND and WRITE_EMPTY.
- final WriteDisposition writeDisposition;
+ abstract Builder toBuilder();
- @Nullable
- final String tableDescription;
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setJsonTableRef(ValueProvider<String> jsonTableRef);
+ abstract Builder setTableRefFunction(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction);
+ abstract Builder setJsonSchema(ValueProvider<String> jsonSchema);
+ abstract Builder setCreateDisposition(CreateDisposition createDisposition);
+ abstract Builder setWriteDisposition(WriteDisposition writeDisposition);
+ abstract Builder setTableDescription(String tableDescription);
+ abstract Builder setValidate(boolean validate);
+ abstract Builder setBigQueryServices(BigQueryServices bigQueryServices);
- // An option to indicate if table validation is desired. Default is true.
- final boolean validate;
+ abstract Write build();
+ }
- @Nullable private BigQueryServices bigQueryServices;
+ private static Builder builder() {
+ return new AutoValue_BigQueryIO_Write.Builder()
+ .setValidate(true)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .setWriteDisposition(WriteDisposition.WRITE_EMPTY);
+ }
/**
* An enumeration type for the BigQuery create disposition strings.
@@ -1474,17 +1490,22 @@ public class BigQueryIO {
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
*/
public static Write to(String tableSpec) {
- return new Write().withTableSpec(tableSpec);
+ return to(StaticValueProvider.of(tableSpec));
}
/** Creates a write transformation for the given table. */
- public static Write to(ValueProvider<String> tableSpec) {
- return new Write().withTableSpec(tableSpec);
+ public static Write to(TableReference table) {
+ return to(StaticValueProvider.of(toTableSpec(table)));
}
/** Creates a write transformation for the given table. */
- public static Write to(TableReference table) {
- return new Write().withTableRef(table);
+ public static Write to(ValueProvider<String> tableSpec) {
+ return builder()
+ .setJsonTableRef(
+ NestedValueProvider.of(
+ NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
+ new TableRefToJson()))
+ .build();
}
/**
@@ -1499,7 +1520,7 @@ public class BigQueryIO {
* always return the same table specification.
*/
public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return new Write().withTableSpec(tableSpecFunction);
+ return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
}
/**
@@ -1511,7 +1532,7 @@ public class BigQueryIO {
*/
private static Write toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Write().withTableRef(tableRefFunction);
+ return builder().setTableRefFunction(tableRefFunction).build();
}
private static class TranslateTableSpecFunction implements
@@ -1528,109 +1549,6 @@ public class BigQueryIO {
}
}
- private Write() {
- this(
- null /* name */,
- null /* jsonTableRef */,
- null /* tableRefFunction */,
- null /* jsonSchema */,
- CreateDisposition.CREATE_IF_NEEDED,
- WriteDisposition.WRITE_EMPTY,
- null /* tableDescription */,
- true /* validate */,
- null /* bigQueryServices */);
- }
-
- private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<String> jsonSchema,
- CreateDisposition createDisposition,
- WriteDisposition writeDisposition,
- @Nullable String tableDescription,
- boolean validate,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.tableRefFunction = tableRefFunction;
- this.jsonSchema = jsonSchema;
- this.createDisposition = checkNotNull(createDisposition, "createDisposition");
- this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
- this.tableDescription = tableDescription;
- this.validate = validate;
- this.bigQueryServices = bigQueryServices;
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- private Write withTableSpec(String tableSpec) {
- return withTableRef(NestedValueProvider.of(
- StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Write withTableRef(TableReference table) {
- return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Write withTableSpec(ValueProvider<String> tableSpec) {
- return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- private Write withTableRef(ValueProvider<TableReference> table) {
- return new Write(name,
- NestedValueProvider.of(table, new TableRefToJson()),
- tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
-
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
- * should always return the same table specification.
- */
- private Write withTableSpec(
- SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
- }
-
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
- * always return the same table reference.
- */
- private Write withTableRef(
- SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
-
/**
* Returns a copy of this write transformation, but using the specified schema for rows
* to be written.
@@ -1638,18 +1556,18 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withSchema(TableSchema schema) {
- return new Write(name, jsonTableRef, tableRefFunction,
- StaticValueProvider.of(toJsonString(schema)),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder()
+ .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+ .build();
}
/**
* Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
*/
public Write withSchema(ValueProvider<TableSchema> schema) {
- return new Write(name, jsonTableRef, tableRefFunction,
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder()
+ .setJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()))
+ .build();
}
/**
@@ -1658,8 +1576,7 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withCreateDisposition(CreateDisposition createDisposition) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder().setCreateDisposition(createDisposition).build();
}
/**
@@ -1668,8 +1585,7 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withWriteDisposition(WriteDisposition writeDisposition) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ return toBuilder().setWriteDisposition(writeDisposition).build();
}
/**
@@ -1677,9 +1593,8 @@ public class BigQueryIO {
*
* <p>Does not modify this object.
*/
- public Write withTableDescription(@Nullable String tableDescription) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ public Write withTableDescription(String tableDescription) {
+ return toBuilder().setTableDescription(tableDescription).build();
}
/**
@@ -1688,14 +1603,12 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Write withoutValidation() {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, false, bigQueryServices);
+ return toBuilder().setValidate(false).build();
}
@VisibleForTesting
Write withTestServices(BigQueryServices testServices) {
- return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, testServices);
+ return toBuilder().setBigQueryServices(testServices).build();
}
private static void verifyTableNotExistOrEmpty(
@@ -1724,23 +1637,25 @@ public class BigQueryIO {
// Exactly one of the table and table reference can be configured.
checkState(
- jsonTableRef != null || tableRefFunction != null,
+ getJsonTableRef() != null || getTableRefFunction() != null,
"must set the table reference of a BigQueryIO.Write transform");
checkState(
- jsonTableRef == null || tableRefFunction == null,
+ getJsonTableRef() == null || getTableRefFunction() == null,
"Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ " transform");
// Require a schema if creating one or more tables.
checkArgument(
- createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || getJsonSchema() != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
// The user specified a table.
- if (jsonTableRef != null && validate) {
+ BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
+ if (getJsonTableRef() != null && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ DatasetService datasetService = bqServices.getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
@@ -1754,22 +1669,22 @@ public class BigQueryIO {
}
}
- if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
// We will use BigQuery's streaming write API -- validate supported dispositions.
- if (tableRefFunction != null) {
+ if (getTableRefFunction() != null) {
checkArgument(
- createDisposition != CreateDisposition.CREATE_NEVER,
+ getCreateDisposition() != CreateDisposition.CREATE_NEVER,
"CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+ " function.");
}
- if (jsonSchema == null) {
+ if (getJsonSchema() == null) {
checkArgument(
- createDisposition == CreateDisposition.CREATE_NEVER,
+ getCreateDisposition() == CreateDisposition.CREATE_NEVER,
"CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
}
checkArgument(
- writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ " when using a tablespec function.");
} else {
@@ -1778,7 +1693,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
+ if (bqServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -1796,17 +1711,18 @@ public class BigQueryIO {
public PDone expand(PCollection<TableRow> input) {
Pipeline p = input.getPipeline();
BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices = getBigQueryServices();
+ BigQueryServices bqServices =
+ MoreObjects.firstNonNull(getBigQueryServices(), new BigQueryServicesImpl());
// When writing an Unbounded PCollection, or when a tablespec function is defined, we use
// StreamWithDeDup and BigQuery's streaming import API.
- if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+ if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
return input.apply(
- new StreamWithDeDup(getTable(), tableRefFunction,
- jsonSchema == null ? null : NestedValueProvider.of(
- jsonSchema, new JsonSchemaToTableSchema()),
- createDisposition,
- tableDescription,
+ new StreamWithDeDup(getTable(), getTableRefFunction(),
+ getJsonSchema() == null ? null : NestedValueProvider.of(
+ getJsonSchema(), new JsonSchemaToTableSchema()),
+ getCreateDisposition(),
+ getTableDescription(),
bqServices));
}
@@ -1874,10 +1790,10 @@ public class BigQueryIO {
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
+ getJsonSchema(),
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
- tableDescription))
+ getTableDescription()))
.withSideInputs(jobIdTokenView));
PCollectionView<Iterable<String>> tempTablesView = tempTables
@@ -1887,10 +1803,10 @@ public class BigQueryIO {
bqServices,
jobIdTokenView,
NestedValueProvider.of(table, new TableRefToJson()),
- writeDisposition,
- createDisposition,
+ getWriteDisposition(),
+ getCreateDisposition(),
tempTablesView,
- tableDescription))
+ getTableDescription()))
.withSideInputs(tempTablesView, jobIdTokenView));
// Write single partition to final table
@@ -1902,10 +1818,10 @@ public class BigQueryIO {
jobIdTokenView,
tempFilePrefix,
NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- writeDisposition,
- createDisposition,
- tableDescription))
+ getJsonSchema(),
+ getWriteDisposition(),
+ getCreateDisposition(),
+ getTableDescription()))
.withSideInputs(jobIdTokenView));
return PDone.in(input.getPipeline());
@@ -1969,41 +1885,31 @@ public class BigQueryIO {
super.populateDisplayData(builder);
builder
- .addIfNotNull(DisplayData.item("table", jsonTableRef)
+ .addIfNotNull(DisplayData.item("table", getJsonTableRef())
.withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .addIfNotNull(DisplayData.item("schema", getJsonSchema())
.withLabel("Table Schema"));
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ if (getTableRefFunction() != null) {
+ builder.add(DisplayData.item("tableFn", getTableRefFunction().getClass())
.withLabel("Table Reference Function"));
}
builder
- .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .add(DisplayData.item("createDisposition", getCreateDisposition().toString())
.withLabel("Table CreateDisposition"))
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .add(DisplayData.item("writeDisposition", getWriteDisposition().toString())
.withLabel("Table WriteDisposition"))
- .addIfNotDefault(DisplayData.item("validation", validate)
+ .addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+ .addIfNotNull(DisplayData.item("tableDescription", getTableDescription())
.withLabel("Table Description"));
}
- /** Returns the create disposition. */
- public CreateDisposition getCreateDisposition() {
- return createDisposition;
- }
-
- /** Returns the write disposition. */
- public WriteDisposition getWriteDisposition() {
- return writeDisposition;
- }
-
/** Returns the table schema. */
public TableSchema getSchema() {
return fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
}
/**
@@ -2036,20 +1942,8 @@ public class BigQueryIO {
/** Returns the table reference, or {@code null}. */
@Nullable
public ValueProvider<TableReference> getTable() {
- return jsonTableRef == null ? null :
- NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
- }
-
- /** Returns {@code true} if table validation is enabled. */
- public boolean getValidate() {
- return validate;
- }
-
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
+ return getJsonTableRef() == null ? null :
+ NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
static class TableRowWriter {
http://git-wip-us.apache.org/repos/asf/beam/blob/5c715896/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index fdaa81c..888d9c1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -712,10 +712,10 @@ public class BigQueryIOTest implements Serializable {
assertEquals(dataset, write.getTable().get().getDatasetId());
assertEquals(table, write.getTable().get().getTableId());
assertEquals(schema, write.getSchema());
- assertEquals(createDisposition, write.createDisposition);
- assertEquals(writeDisposition, write.writeDisposition);
- assertEquals(tableDescription, write.tableDescription);
- assertEquals(validate, write.validate);
+ assertEquals(createDisposition, write.getCreateDisposition());
+ assertEquals(writeDisposition, write.getWriteDisposition());
+ assertEquals(tableDescription, write.getTableDescription());
+ assertEquals(validate, write.getValidate());
}
@Before