You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:57 UTC
[03/10] beam git commit: Condense BigQueryIO.Write.Bound into
BigQueryIO.Write
Condense BigQueryIO.Write.Bound into BigQueryIO.Write
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d1f4400
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d1f4400
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d1f4400
Branch: refs/heads/master
Commit: 7d1f4400ab844c7b4e636482891be55174390431
Parents: 825338a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:26:09 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:24 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 1080 +++++++++---------
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 116 +-
2 files changed, 552 insertions(+), 644 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f6c8575..90d7f67 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1415,7 +1415,43 @@ public class BigQueryIO {
* }
* }}</pre>
*/
- public static class Write {
+ public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+ // Maximum number of files in a single partition.
+ static final int MAX_NUM_FILES = 10000;
+
+ // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
+ static final long MAX_SIZE_BYTES = 11 * (1L << 40);
+
+ // The maximum number of retry jobs.
+ private static final int MAX_RETRY_JOBS = 3;
+
+ // The maximum number of retries to poll the status of a job.
+ // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+ private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ @Nullable private final ValueProvider<String> jsonTableRef;
+
+ @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+
+ // Table schema. The schema is required only if the table does not exist.
+ @Nullable private final ValueProvider<String> jsonSchema;
+
+ // Options for creating the table. Valid values are CREATE_IF_NEEDED and
+ // CREATE_NEVER.
+ final CreateDisposition createDisposition;
+
+ // Options for writing to the table. Valid values are WRITE_TRUNCATE,
+ // WRITE_APPEND and WRITE_EMPTY.
+ final WriteDisposition writeDisposition;
+
+ @Nullable
+ final String tableDescription;
+
+ // An option to indicate if table validation is desired. Default is true.
+ final boolean validate;
+
+ @Nullable private BigQueryServices bigQueryServices;
+
/**
* An enumeration type for the BigQuery create disposition strings.
*
@@ -1488,18 +1524,18 @@ public class BigQueryIO {
*
* <p>Refer to {@link #parseTableSpec(String)} for the specification format.
*/
- public static Bound to(String tableSpec) {
- return new Bound().to(tableSpec);
+ public static Write to(String tableSpec) {
+ return new Write().withTableSpec(tableSpec);
}
/** Creates a write transformation for the given table. */
- public static Bound to(ValueProvider<String> tableSpec) {
- return new Bound().to(tableSpec);
+ public static Write to(ValueProvider<String> tableSpec) {
+ return new Write().withTableSpec(tableSpec);
}
/** Creates a write transformation for the given table. */
- public static Bound to(TableReference table) {
- return new Bound().to(table);
+ public static Write to(TableReference table) {
+ return new Write().withTableRef(table);
}
/**
@@ -1513,8 +1549,8 @@ public class BigQueryIO {
* <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
* always return the same table specification.
*/
- public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return new Bound().to(tableSpecFunction);
+ public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return new Write().withTableSpec(tableSpecFunction);
}
/**
@@ -1524,634 +1560,547 @@ public class BigQueryIO {
* <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
* always return the same table reference.
*/
- public static Bound toTableReference(
+ private static Write toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Bound().toTableReference(tableRefFunction);
+ return new Write().withTableRef(tableRefFunction);
+ }
+
+ private static class TranslateTableSpecFunction implements
+ SerializableFunction<BoundedWindow, TableReference> {
+ private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+
+ TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ this.tableSpecFunction = tableSpecFunction;
+ }
+
+ @Override
+ public TableReference apply(BoundedWindow value) {
+ return parseTableSpec(tableSpecFunction.apply(value));
+ }
+ }
+
+ private Write() {
+ this(
+ null /* name */,
+ null /* jsonTableRef */,
+ null /* tableRefFunction */,
+ null /* jsonSchema */,
+ CreateDisposition.CREATE_IF_NEEDED,
+ WriteDisposition.WRITE_EMPTY,
+ null /* tableDescription */,
+ true /* validate */,
+ null /* bigQueryServices */);
+ }
+
+ private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
+ @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+ @Nullable ValueProvider<String> jsonSchema,
+ CreateDisposition createDisposition,
+ WriteDisposition writeDisposition,
+ @Nullable String tableDescription,
+ boolean validate,
+ @Nullable BigQueryServices bigQueryServices) {
+ super(name);
+ this.jsonTableRef = jsonTableRef;
+ this.tableRefFunction = tableRefFunction;
+ this.jsonSchema = jsonSchema;
+ this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+ this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+ this.tableDescription = tableDescription;
+ this.validate = validate;
+ this.bigQueryServices = bigQueryServices;
}
/**
- * Creates a write transformation with the specified schema to use in table creation.
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
*
- * <p>The schema is <i>required</i> only if writing to a table that does not already
- * exist, and {@link CreateDisposition} is set to
- * {@link CreateDisposition#CREATE_IF_NEEDED}.
+ * <p>Does not modify this object.
*/
- public static Bound withSchema(TableSchema schema) {
- return new Bound().withSchema(schema);
+ private Write withTableSpec(String tableSpec) {
+ return withTableRef(NestedValueProvider.of(
+ StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
}
/**
- * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
*/
- public static Bound withSchema(ValueProvider<TableSchema> schema) {
- return new Bound().withSchema(schema);
+ public Write withTableRef(TableReference table) {
+ return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
}
- /** Creates a write transformation with the specified options for creating the table. */
- public static Bound withCreateDisposition(CreateDisposition disposition) {
- return new Bound().withCreateDisposition(disposition);
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableSpec(ValueProvider<String> tableSpec) {
+ return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
}
- /** Creates a write transformation with the specified options for writing to the table. */
- public static Bound withWriteDisposition(WriteDisposition disposition) {
- return new Bound().withWriteDisposition(disposition);
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ private Write withTableRef(ValueProvider<TableReference> table) {
+ return new Write(name,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, bigQueryServices);
}
- /** Creates a write transformation with the specified table description. */
- public static Bound withTableDescription(@Nullable String tableDescription) {
- return new Bound().withTableDescription(tableDescription);
+ /**
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
+ * should always return the same table specification.
+ */
+ private Write withTableSpec(
+ SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
}
/**
- * Creates a write transformation with BigQuery table validation disabled.
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+ * always return the same table reference.
*/
- public static Bound withoutValidation() {
- return new Bound().withoutValidation();
+ private Write withTableRef(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, bigQueryServices);
}
/**
- * A {@link PTransform} that can write either a bounded or unbounded
- * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
+ * Returns a copy of this write transformation, but using the specified schema for rows
+ * to be written.
+ *
+ * <p>Does not modify this object.
*/
- public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
- // Maximum number of files in a single partition.
- static final int MAX_NUM_FILES = 10000;
-
- // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
- static final long MAX_SIZE_BYTES = 11 * (1L << 40);
-
- // The maximum number of retry jobs.
- static final int MAX_RETRY_JOBS = 3;
-
- // The maximum number of retries to poll the status of a job.
- // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
- static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
- @Nullable final ValueProvider<String> jsonTableRef;
-
- @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
- // Table schema. The schema is required only if the table does not exist.
- @Nullable final ValueProvider<String> jsonSchema;
+ public Write withSchema(TableSchema schema) {
+ return new Write(name, jsonTableRef, tableRefFunction,
+ StaticValueProvider.of(toJsonString(schema)),
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // Options for creating the table. Valid values are CREATE_IF_NEEDED and
- // CREATE_NEVER.
- final CreateDisposition createDisposition;
+ /**
+ * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+ */
+ public Write withSchema(ValueProvider<TableSchema> schema) {
+ return new Write(name, jsonTableRef, tableRefFunction,
+ NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // Options for writing to the table. Valid values are WRITE_TRUNCATE,
- // WRITE_APPEND and WRITE_EMPTY.
- final WriteDisposition writeDisposition;
+ /**
+ * Returns a copy of this write transformation, but using the specified create disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withCreateDisposition(CreateDisposition createDisposition) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- @Nullable final String tableDescription;
+ /**
+ * Returns a copy of this write transformation, but using the specified write disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withWriteDisposition(WriteDisposition writeDisposition) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- // An option to indicate if table validation is desired. Default is true.
- final boolean validate;
+ /**
+ * Returns a copy of this write transformation, but using the specified table description.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withTableDescription(@Nullable String tableDescription) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+ createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ }
- @Nullable private BigQueryServices bigQueryServices;
+ /**
+ * Returns a copy of this write transformation, but without BigQuery table validation.
+ *
+ * <p>Does not modify this object.
+ */
+ public Write withoutValidation() {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, false, bigQueryServices);
+ }
- private static class TranslateTableSpecFunction implements
- SerializableFunction<BoundedWindow, TableReference> {
- private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+ @VisibleForTesting
+ Write withTestServices(BigQueryServices testServices) {
+ return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, tableDescription, validate, testServices);
+ }
- TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- this.tableSpecFunction = tableSpecFunction;
+ private static void verifyTableNotExistOrEmpty(
+ DatasetService datasetService,
+ TableReference tableRef) {
+ try {
+ if (datasetService.getTable(tableRef) != null) {
+ checkState(
+ datasetService.isTableEmpty(tableRef),
+ "BigQuery table is not empty: %s.",
+ BigQueryIO.toTableSpec(tableRef));
}
-
- @Override
- public TableReference apply(BoundedWindow value) {
- return parseTableSpec(tableSpecFunction.apply(value));
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
+ throw new RuntimeException(
+ "unable to confirm BigQuery table emptiness for table "
+ + BigQueryIO.toTableSpec(tableRef), e);
}
+ }
- /**
- * @deprecated Should be private. Instead, use one of the factory methods in
- * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
- * instance of this class.
- */
- @Deprecated
- public Bound() {
- this(
- null /* name */,
- null /* jsonTableRef */,
- null /* tableRefFunction */,
- null /* jsonSchema */,
- CreateDisposition.CREATE_IF_NEEDED,
- WriteDisposition.WRITE_EMPTY,
- null /* tableDescription */,
- true /* validate */,
- null /* bigQueryServices */);
- }
-
- private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
- @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
- @Nullable ValueProvider<String> jsonSchema,
- CreateDisposition createDisposition,
- WriteDisposition writeDisposition,
- @Nullable String tableDescription,
- boolean validate,
- @Nullable BigQueryServices bigQueryServices) {
- super(name);
- this.jsonTableRef = jsonTableRef;
- this.tableRefFunction = tableRefFunction;
- this.jsonSchema = jsonSchema;
- this.createDisposition = checkNotNull(createDisposition, "createDisposition");
- this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
- this.tableDescription = tableDescription;
- this.validate = validate;
- this.bigQueryServices = bigQueryServices;
- }
-
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Bound to(String tableSpec) {
- return toTableRef(NestedValueProvider.of(
- StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
- }
+ @Override
+ public void validate(PCollection<TableRow> input) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- public Bound to(TableReference table) {
- return to(StaticValueProvider.of(toTableSpec(table)));
- }
+ // Exactly one of the table and table reference can be configured.
+ checkState(
+ jsonTableRef != null || tableRefFunction != null,
+ "must set the table reference of a BigQueryIO.Write transform");
+ checkState(
+ jsonTableRef == null || tableRefFunction == null,
+ "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ + " transform");
- /**
- * Returns a copy of this write transformation, but writing to the specified table. Refer to
- * {@link #parseTableSpec(String)} for the specification format.
- *
- * <p>Does not modify this object.
- */
- public Bound to(ValueProvider<String> tableSpec) {
- return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+ // Require a schema if creating one or more tables.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+ // The user specified a table.
+ if (jsonTableRef != null && validate) {
+ TableReference table = getTableWithDefaultProject(options).get();
+
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
+ verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ verifyTableNotExistOrEmpty(datasetService, table);
+ }
}
- /**
- * Returns a copy of this write transformation, but writing to the specified table.
- *
- * <p>Does not modify this object.
- */
- private Bound toTableRef(ValueProvider<TableReference> table) {
- return new Bound(name,
- NestedValueProvider.of(table, new TableRefToJson()),
- tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+ // We will use BigQuery's streaming write API -- validate supported dispositions.
+ if (tableRefFunction != null) {
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+ + " function.");
+ }
+ if (jsonSchema == null) {
+ checkArgument(
+ createDisposition == CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+ }
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
- * should always return the same table specification.
- */
- public Bound to(
- SerializableFunction<BoundedWindow, String> tableSpecFunction) {
- return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
+ checkArgument(
+ writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ + " when using a tablespec function.");
+ } else {
+ // We will use a BigQuery load job -- validate the temp location.
+ String tempLocation = options.getTempLocation();
+ checkArgument(
+ !Strings.isNullOrEmpty(tempLocation),
+ "BigQueryIO.Write needs a GCS temp location to store temp files.");
+ if (bigQueryServices == null) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+ tempLocation),
+ e);
+ }
+ }
}
+ }
- /**
- * Returns a copy of this write transformation, but using the specified function to determine
- * which table to write to for each window.
- *
- * <p>Does not modify this object.
- *
- * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
- * always return the same table reference.
- */
- public Bound toTableReference(
- SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, bigQueryServices);
+ @Override
+ public PDone expand(PCollection<TableRow> input) {
+ Pipeline p = input.getPipeline();
+ BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+ BigQueryServices bqServices = getBigQueryServices();
+
+ // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
+ // StreamWithDeDup and BigQuery's streaming import API.
+ if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+ return input.apply(
+ new StreamWithDeDup(getTable(), tableRefFunction,
+ jsonSchema == null ? null : NestedValueProvider.of(
+ jsonSchema, new JsonSchemaToTableSchema()),
+ createDisposition,
+ tableDescription,
+ bqServices));
}
- /**
- * Returns a copy of this write transformation, but using the specified schema for rows
- * to be written.
- *
- * <p>Does not modify this object.
- */
- public Bound withSchema(TableSchema schema) {
- return new Bound(name, jsonTableRef, tableRefFunction,
- StaticValueProvider.of(toJsonString(schema)),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- /**
- * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
- */
- public Bound withSchema(ValueProvider<TableSchema> schema) {
- return new Bound(name, jsonTableRef, tableRefFunction,
- NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ String stepUuid = randomUUIDString();
- /**
- * Returns a copy of this write transformation, but using the specified create disposition.
- *
- * <p>Does not modify this object.
- */
- public Bound withCreateDisposition(CreateDisposition createDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+ String tempLocation = options.getTempLocation();
+ String tempFilePrefix;
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ tempFilePrefix = factory.resolve(
+ factory.resolve(tempLocation, "BigQueryWriteTemp"),
+ stepUuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+ e);
}
- /**
- * Returns a copy of this write transformation, but using the specified write disposition.
- *
- * <p>Does not modify this object.
- */
- public Bound withWriteDisposition(WriteDisposition writeDisposition) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ // Create a singleton job ID token at execution time.
+ PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+ PCollectionView<String> jobIdTokenView = p
+ .apply("TriggerIdCreation", Create.of("ignored"))
+ .apply("CreateJobId", MapElements.via(
+ new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return randomUUIDString();
+ }
+ }))
+ .apply(View.<String>asSingleton());
+
+ PCollection<TableRow> inputInGlobalWindow =
+ input.apply(
+ Window.<TableRow>into(new GlobalWindows())
+ .triggering(DefaultTrigger.of())
+ .discardingFiredPanes());
+
+ PCollection<KV<String, Long>> results = inputInGlobalWindow
+ .apply("WriteBundles",
+ ParDo.of(new WriteBundles(tempFilePrefix)));
+
+ TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+ new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+ TupleTag<KV<Long, List<String>>> singlePartitionTag =
+ new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+ PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+ .apply("ResultsView", View.<KV<String, Long>>asIterable());
+ PCollectionTuple partitions = singleton.apply(ParDo
+ .of(new WritePartition(
+ resultsView,
+ multiPartitionsTag,
+ singlePartitionTag))
+ .withSideInputs(resultsView)
+ .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+ // Write multiple partitions to separate temporary tables
+ PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+ .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+ false,
+ bqServices,
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ jsonSchema,
+ WriteDisposition.WRITE_EMPTY,
+ CreateDisposition.CREATE_IF_NEEDED,
+ tableDescription))
+ .withSideInputs(jobIdTokenView));
+
+ PCollectionView<Iterable<String>> tempTablesView = tempTables
+ .apply("TempTablesView", View.<String>asIterable());
+ singleton.apply(ParDo
+ .of(new WriteRename(
+ bqServices,
+ jobIdTokenView,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ writeDisposition,
+ createDisposition,
+ tempTablesView,
+ tableDescription))
+ .withSideInputs(tempTablesView, jobIdTokenView));
+
+ // Write single partition to final table
+ partitions.get(singlePartitionTag)
+ .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+ .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+ true,
+ bqServices,
+ jobIdTokenView,
+ tempFilePrefix,
+ NestedValueProvider.of(table, new TableRefToJson()),
+ jsonSchema,
+ writeDisposition,
+ createDisposition,
+ tableDescription))
+ .withSideInputs(jobIdTokenView));
- /**
- * Returns a copy of this write transformation, but using the specified table description.
- *
- * <p>Does not modify this object.
- */
- public Bound withTableDescription(@Nullable String tableDescription) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
- createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
- }
+ return PDone.in(input.getPipeline());
+ }
- /**
- * Returns a copy of this write transformation, but without BigQuery table validation.
- *
- * <p>Does not modify this object.
- */
- public Bound withoutValidation() {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, false, bigQueryServices);
- }
+ private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
+ private transient TableRowWriter writer = null;
+ private final String tempFilePrefix;
- @VisibleForTesting
- Bound withTestServices(BigQueryServices testServices) {
- return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, tableDescription, validate, testServices);
+ WriteBundles(String tempFilePrefix) {
+ this.tempFilePrefix = tempFilePrefix;
}
- private static void verifyTableNotExistOrEmpty(
- DatasetService datasetService,
- TableReference tableRef) {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ if (writer == null) {
+ writer = new TableRowWriter(tempFilePrefix);
+ writer.open(UUID.randomUUID().toString());
+ LOG.debug("Done opening writer {}", writer);
+ }
try {
- if (datasetService.getTable(tableRef) != null) {
- checkState(
- datasetService.isTableEmpty(tableRef),
- "BigQuery table is not empty: %s.",
- BigQueryIO.toTableSpec(tableRef));
- }
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
+ writer.write(c.element());
+ } catch (Exception e) {
+ // Discard write result and close the write.
+ try {
+ writer.close();
+ // The writer does not need to be reset, as this DoFn cannot be reused.
+ } catch (Exception closeException) {
+ // Do not mask the exception that caused the write to fail.
+ e.addSuppressed(closeException);
}
- throw new RuntimeException(
- "unable to confirm BigQuery table emptiness for table "
- + BigQueryIO.toTableSpec(tableRef), e);
+ throw e;
}
}
- @Override
- public void validate(PCollection<TableRow> input) {
- BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
- // Exactly one of the table and table reference can be configured.
- checkState(
- jsonTableRef != null || tableRefFunction != null,
- "must set the table reference of a BigQueryIO.Write transform");
- checkState(
- jsonTableRef == null || tableRefFunction == null,
- "Cannot set both a table reference and a table function for a BigQueryIO.Write"
- + " transform");
-
- // Require a schema if creating one or more tables.
- checkArgument(
- createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
- "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
- // The user specified a table.
- if (jsonTableRef != null && validate) {
- TableReference table = getTableWithDefaultProject(options).get();
-
- DatasetService datasetService = getBigQueryServices().getDatasetService(options);
- // Check for destination table presence and emptiness for early failure notification.
- // Note that a presence check can fail when the table or dataset is created by an earlier
- // stage of the pipeline. For these cases the #withoutValidation method can be used to
- // disable the check.
- verifyDatasetPresence(datasetService, table);
- if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- verifyTablePresence(datasetService, table);
- }
- if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- verifyTableNotExistOrEmpty(datasetService, table);
- }
- }
-
- if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
- // We will use BigQuery's streaming write API -- validate supported dispositions.
- if (tableRefFunction != null) {
- checkArgument(
- createDisposition != CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
- + " function.");
- }
- if (jsonSchema == null) {
- checkArgument(
- createDisposition == CreateDisposition.CREATE_NEVER,
- "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
- }
-
- checkArgument(
- writeDisposition != WriteDisposition.WRITE_TRUNCATE,
- "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
- + " when using a tablespec function.");
- } else {
- // We will use a BigQuery load job -- validate the temp location.
- String tempLocation = options.getTempLocation();
- checkArgument(
- !Strings.isNullOrEmpty(tempLocation),
- "BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (bigQueryServices == null) {
- try {
- GcsPath.fromUri(tempLocation);
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
- tempLocation),
- e);
- }
- }
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (writer != null) {
+ c.output(writer.close());
+ writer = null;
}
}
@Override
- public PDone expand(PCollection<TableRow> input) {
- Pipeline p = input.getPipeline();
- BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
- BigQueryServices bqServices = getBigQueryServices();
-
- // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
- // StreamWithDeDup and BigQuery's streaming import API.
- if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
- return input.apply(
- new StreamWithDeDup(getTable(), tableRefFunction,
- jsonSchema == null ? null : NestedValueProvider.of(
- jsonSchema, new JsonSchemaToTableSchema()),
- createDisposition,
- tableDescription,
- bqServices));
- }
-
- ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
- String stepUuid = randomUUIDString();
-
- String tempLocation = options.getTempLocation();
- String tempFilePrefix;
- try {
- IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
- tempFilePrefix = factory.resolve(
- factory.resolve(tempLocation, "BigQueryWriteTemp"),
- stepUuid);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
- e);
- }
-
- // Create a singleton job ID token at execution time.
- PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
- PCollectionView<String> jobIdTokenView = p
- .apply("TriggerIdCreation", Create.of("ignored"))
- .apply("CreateJobId", MapElements.via(
- new SimpleFunction<String, String>() {
- @Override
- public String apply(String input) {
- return randomUUIDString();
- }
- }))
- .apply(View.<String>asSingleton());
-
- PCollection<TableRow> inputInGlobalWindow =
- input.apply(
- Window.<TableRow>into(new GlobalWindows())
- .triggering(DefaultTrigger.of())
- .discardingFiredPanes());
-
- PCollection<KV<String, Long>> results = inputInGlobalWindow
- .apply("WriteBundles",
- ParDo.of(new WriteBundles(tempFilePrefix)));
-
- TupleTag<KV<Long, List<String>>> multiPartitionsTag =
- new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<Long, List<String>>> singlePartitionTag =
- new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
- PCollectionView<Iterable<KV<String, Long>>> resultsView = results
- .apply("ResultsView", View.<KV<String, Long>>asIterable());
- PCollectionTuple partitions = singleton.apply(ParDo
- .of(new WritePartition(
- resultsView,
- multiPartitionsTag,
- singlePartitionTag))
- .withSideInputs(resultsView)
- .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
- // Write multiple partitions to separate temporary tables
- PCollection<String> tempTables = partitions.get(multiPartitionsTag)
- .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
- false,
- bqServices,
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- WriteDisposition.WRITE_EMPTY,
- CreateDisposition.CREATE_IF_NEEDED,
- tableDescription))
- .withSideInputs(jobIdTokenView));
-
- PCollectionView<Iterable<String>> tempTablesView = tempTables
- .apply("TempTablesView", View.<String>asIterable());
- singleton.apply(ParDo
- .of(new WriteRename(
- bqServices,
- jobIdTokenView,
- NestedValueProvider.of(table, new TableRefToJson()),
- writeDisposition,
- createDisposition,
- tempTablesView,
- tableDescription))
- .withSideInputs(tempTablesView, jobIdTokenView));
-
- // Write single partition to final table
- partitions.get(singlePartitionTag)
- .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
- .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
- true,
- bqServices,
- jobIdTokenView,
- tempFilePrefix,
- NestedValueProvider.of(table, new TableRefToJson()),
- jsonSchema,
- writeDisposition,
- createDisposition,
- tableDescription))
- .withSideInputs(jobIdTokenView));
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- return PDone.in(input.getPipeline());
+ builder
+ .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+ .withLabel("Temporary File Prefix"));
}
+ }
- private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
- private transient TableRowWriter writer = null;
- private final String tempFilePrefix;
-
- WriteBundles(String tempFilePrefix) {
- this.tempFilePrefix = tempFilePrefix;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- if (writer == null) {
- writer = new TableRowWriter(tempFilePrefix);
- writer.open(UUID.randomUUID().toString());
- LOG.debug("Done opening writer {}", writer);
- }
- try {
- writer.write(c.element());
- } catch (Exception e) {
- // Discard write result and close the write.
- try {
- writer.close();
- // The writer does not need to be reset, as this DoFn cannot be reused.
- } catch (Exception closeException) {
- // Do not mask the exception that caused the write to fail.
- e.addSuppressed(closeException);
- }
- throw e;
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (writer != null) {
- c.output(writer.close());
- writer = null;
- }
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
- .withLabel("Temporary File Prefix"));
- }
- }
+ builder
+ .addIfNotNull(DisplayData.item("table", jsonTableRef)
+ .withLabel("Table Reference"))
+ .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .withLabel("Table Schema"));
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
+ if (tableRefFunction != null) {
+ builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ .withLabel("Table Reference Function"));
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- builder
- .addIfNotNull(DisplayData.item("table", jsonTableRef)
- .withLabel("Table Reference"))
- .addIfNotNull(DisplayData.item("schema", jsonSchema)
- .withLabel("Table Schema"));
-
- if (tableRefFunction != null) {
- builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
- .withLabel("Table Reference Function"));
- }
+ builder
+ .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .withLabel("Table CreateDisposition"))
+ .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .withLabel("Table WriteDisposition"))
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"), true)
+ .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+ .withLabel("Table Description"));
+ }
- builder
- .add(DisplayData.item("createDisposition", createDisposition.toString())
- .withLabel("Table CreateDisposition"))
- .add(DisplayData.item("writeDisposition", writeDisposition.toString())
- .withLabel("Table WriteDisposition"))
- .addIfNotDefault(DisplayData.item("validation", validate)
- .withLabel("Validation Enabled"), true)
- .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
- .withLabel("Table Description"));
- }
+ /** Returns the create disposition. */
+ public CreateDisposition getCreateDisposition() {
+ return createDisposition;
+ }
- /** Returns the create disposition. */
- public CreateDisposition getCreateDisposition() {
- return createDisposition;
- }
+ /** Returns the write disposition. */
+ public WriteDisposition getWriteDisposition() {
+ return writeDisposition;
+ }
- /** Returns the write disposition. */
- public WriteDisposition getWriteDisposition() {
- return writeDisposition;
- }
+ /** Returns the table schema. */
+ public TableSchema getSchema() {
+ return fromJsonString(
+ jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ }
- /** Returns the table schema. */
- public TableSchema getSchema() {
- return fromJsonString(
- jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+ /**
+ * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+ *
+ * <p>If the table's project is not specified, use the executing project.
+ */
+ @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+ BigQueryOptions bqOptions) {
+ ValueProvider<TableReference> table = getTable();
+ if (table == null) {
+ return table;
}
-
- /**
- * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
- *
- * <p>If the table's project is not specified, use the executing project.
- */
- @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
- BigQueryOptions bqOptions) {
- ValueProvider<TableReference> table = getTable();
- if (table == null) {
- return table;
- }
- if (!table.isAccessible()) {
- LOG.info("Using a dynamic value for table input. This must contain a project"
- + " in the table reference: {}", table);
- return table;
- }
- if (Strings.isNullOrEmpty(table.get().getProjectId())) {
- // If user does not specify a project we assume the table to be located in
- // the default project.
- TableReference tableRef = table.get();
- tableRef.setProjectId(bqOptions.getProject());
- return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(tableRef)), new JsonTableRefToTableRef());
- }
+ if (!table.isAccessible()) {
+ LOG.info("Using a dynamic value for table input. This must contain a project"
+ + " in the table reference: {}", table);
return table;
}
-
- /** Returns the table reference, or {@code null}. */
- @Nullable
- public ValueProvider<TableReference> getTable() {
- return jsonTableRef == null ? null :
- NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ TableReference tableRef = table.get();
+ tableRef.setProjectId(bqOptions.getProject());
+ return NestedValueProvider.of(StaticValueProvider.of(
+ toJsonString(tableRef)), new JsonTableRefToTableRef());
}
+ return table;
+ }
- /** Returns {@code true} if table validation is enabled. */
- public boolean getValidate() {
- return validate;
- }
+ /** Returns the table reference, or {@code null}. */
+ @Nullable
+ public ValueProvider<TableReference> getTable() {
+ return jsonTableRef == null ? null :
+ NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+ }
- private BigQueryServices getBigQueryServices() {
- if (bigQueryServices == null) {
- bigQueryServices = new BigQueryServicesImpl();
- }
- return bigQueryServices;
+ /** Returns {@code true} if table validation is enabled. */
+ public boolean getValidate() {
+ return validate;
+ }
+
+ private BigQueryServices getBigQueryServices() {
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
}
+ return bigQueryServices;
}
static class TableRowWriter {
@@ -2231,8 +2180,8 @@ public class BigQueryIO {
List<String> currResults = Lists.newArrayList();
for (int i = 0; i < results.size(); ++i) {
KV<String, Long> fileResult = results.get(i);
- if (currNumFiles + 1 > Bound.MAX_NUM_FILES
- || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+ if (currNumFiles + 1 > Write.MAX_NUM_FILES
+ || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
currResults = Lists.newArrayList();
currNumFiles = 0;
@@ -2331,13 +2280,13 @@ public class BigQueryIO {
String projectId = ref.getProjectId();
Job lastFailedLoadJob = null;
- for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+ for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
- Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
@@ -2361,7 +2310,7 @@ public class BigQueryIO {
"Failed to create load job with id prefix %s, "
+ "reached max retries: %d, last failed load job: %s.",
jobIdPrefix,
- Bound.MAX_RETRY_JOBS,
+ Write.MAX_RETRY_JOBS,
jobToPrettyString(lastFailedLoadJob)));
}
@@ -2477,13 +2426,13 @@ public class BigQueryIO {
String projectId = ref.getProjectId();
Job lastFailedCopyJob = null;
- for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+ for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
JobReference jobRef = new JobReference()
.setProjectId(projectId)
.setJobId(jobId);
jobService.startCopyJob(jobRef, copyConfig);
- Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+ Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
Status jobStatus = parseStatus(copyJob);
switch (jobStatus) {
case SUCCEEDED:
@@ -2507,7 +2456,7 @@ public class BigQueryIO {
"Failed to create copy job with id prefix %s, "
+ "reached max retries: %d, last failed copy job: %s.",
jobIdPrefix,
- Bound.MAX_RETRY_JOBS,
+ Write.MAX_RETRY_JOBS,
jobToPrettyString(lastFailedCopyJob)));
}
@@ -2536,9 +2485,6 @@ public class BigQueryIO {
.withLabel("Create Disposition"));
}
}
-
- /** Disallow construction of utility class. */
- private Write() {}
}
private static String jobToPrettyString(@Nullable Job job) throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bb1528b..f403c5a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -690,11 +690,11 @@ public class BigQueryIOTest implements Serializable {
}
private void checkWriteObject(
- BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+ BigQueryIO.Write write, String project, String dataset, String table,
TableSchema schema, CreateDisposition createDisposition,
WriteDisposition writeDisposition, String tableDescription) {
checkWriteObjectWithValidate(
- bound,
+ write,
project,
dataset,
table,
@@ -706,17 +706,17 @@ public class BigQueryIOTest implements Serializable {
}
private void checkWriteObjectWithValidate(
- BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+ BigQueryIO.Write write, String project, String dataset, String table,
TableSchema schema, CreateDisposition createDisposition,
WriteDisposition writeDisposition, String tableDescription, boolean validate) {
- assertEquals(project, bound.getTable().get().getProjectId());
- assertEquals(dataset, bound.getTable().get().getDatasetId());
- assertEquals(table, bound.getTable().get().getTableId());
- assertEquals(schema, bound.getSchema());
- assertEquals(createDisposition, bound.createDisposition);
- assertEquals(writeDisposition, bound.writeDisposition);
- assertEquals(tableDescription, bound.tableDescription);
- assertEquals(validate, bound.validate);
+ assertEquals(project, write.getTable().get().getProjectId());
+ assertEquals(dataset, write.getTable().get().getDatasetId());
+ assertEquals(table, write.getTable().get().getTableId());
+ assertEquals(schema, write.getSchema());
+ assertEquals(createDisposition, write.createDisposition);
+ assertEquals(writeDisposition, write.writeDisposition);
+ assertEquals(tableDescription, write.tableDescription);
+ assertEquals(validate, write.validate);
}
@Before
@@ -1328,10 +1328,10 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWrite() {
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@@ -1355,7 +1355,7 @@ public class BigQueryIOTest implements Serializable {
options.as(StreamingOptions.class).setStreaming(streaming);
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("project:dataset.table")
.withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
.withTestServices(new FakeBigQueryServices()
@@ -1375,10 +1375,10 @@ public class BigQueryIOTest implements Serializable {
public void testBuildWriteWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
checkWriteObjectWithValidate(
- bound,
+ write,
"foo.com:project",
"somedataset",
"sometable",
@@ -1391,9 +1391,9 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testBuildWriteDefaultProject() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
+ BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
checkWriteObject(
- bound, null, "somedataset", "sometable",
+ write, null, "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@@ -1403,89 +1403,80 @@ public class BigQueryIOTest implements Serializable {
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
- BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
+ BigQueryIO.Write write = BigQueryIO.Write.to(table);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
- @Category(NeedsRunner.class)
- public void testBuildWriteWithoutTable() {
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("must set the table reference");
- p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation());
- }
-
- @Test
public void testBuildWriteWithSchema() {
TableSchema schema = new TableSchema();
- BigQueryIO.Write.Bound bound =
+ BigQueryIO.Write write =
BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithCreateDispositionNever() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_NEVER);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithCreateDispositionIfNeeded() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithWriteDispositionTruncate() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
}
@Test
public void testBuildWriteWithWriteDispositionAppend() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
}
@Test
public void testBuildWriteWithWriteDispositionEmpty() {
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withWriteDisposition(WriteDisposition.WRITE_EMPTY);
checkWriteObject(
- bound, "foo.com:project", "somedataset", "sometable",
+ write, "foo.com:project", "somedataset", "sometable",
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
}
@Test
public void testBuildWriteWithWriteWithTableDescription() {
final String tblDescription = "foo bar table";
- BigQueryIO.Write.Bound bound = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to("foo.com:project:somedataset.sometable")
.withTableDescription(tblDescription);
checkWriteObject(
- bound,
+ write,
"foo.com:project",
"somedataset",
"sometable",
@@ -1501,7 +1492,7 @@ public class BigQueryIOTest implements Serializable {
TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
final String tblDescription = "foo bar table";
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to(tableSpec)
.withSchema(schema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1702,35 +1693,6 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testWriteValidateFailsTableAndTableSpec() {
- p.enableAbandonedNodeEnforcement(false);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Cannot set both a table reference and a table function");
- p
- .apply(Create.empty(TableRowJsonCoder.of()))
- .apply(BigQueryIO.Write
- .to("dataset.table")
- .to(new SerializableFunction<BoundedWindow, String>() {
- @Override
- public String apply(BoundedWindow input) {
- return null;
- }
- }));
- }
-
- @Test
- public void testWriteValidateFailsNoTableAndNoTableSpec() {
- p.enableAbandonedNodeEnforcement(false);
-
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
- p
- .apply(Create.empty(TableRowJsonCoder.of()))
- .apply("name", BigQueryIO.Write.withoutValidation());
- }
-
- @Test
public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
.withJobService(mockJobService)
@@ -2094,7 +2056,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionSinglePartition() throws Exception {
- long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+ long numFiles = BigQueryIO.Write.MAX_NUM_FILES;
long fileSize = 1;
// One partition is needed.
@@ -2104,7 +2066,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionManyFiles() throws Exception {
- long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+ long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3;
long fileSize = 1;
// One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
@@ -2115,7 +2077,7 @@ public class BigQueryIOTest implements Serializable {
@Test
public void testWritePartitionLargeFileSize() throws Exception {
long numFiles = 10;
- long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+ long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3;
// One partition is needed for each group of three files.
long expectedNumPartitions = 4;
@@ -2382,7 +2344,7 @@ public class BigQueryIOTest implements Serializable {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setTempLocation("gs://testbucket/testdir");
Pipeline pipeline = TestPipeline.create(options);
- BigQueryIO.Write.Bound write = BigQueryIO.Write
+ BigQueryIO.Write write = BigQueryIO.Write
.to(options.getOutputTable())
.withSchema(NestedValueProvider.of(
options.getOutputSchema(), new JsonSchemaToTableSchema()))