You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/10/05 21:03:20 UTC
[beam] branch master updated: Merge pull request #23505: opt in for schema update. addresses #23504
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7980cb9a35c Merge pull request #23505: opt in for schema update. addresses #23504
7980cb9a35c is described below
commit 7980cb9a35cdb37ffd57a43ec9a1bfa5e204f5d7
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Oct 5 14:03:12 2022 -0700
Merge pull request #23505: opt in for schema update. addresses #23504
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 25 +++++++++++++++++++++-
.../StorageApiDynamicDestinationsTableRow.java | 7 ++++--
.../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java | 1 +
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a6280c3038..024dcb053b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1804,6 +1804,7 @@ public class BigQueryIO {
.setUseBeamSchema(false)
.setAutoSharding(false)
.setPropagateSuccessful(true)
+ .setAutoSchemaUpdate(false)
.setDeterministicRecordIdFn(null)
.build();
}
@@ -1947,6 +1948,8 @@ public class BigQueryIO {
abstract Boolean getPropagateSuccessful();
+ abstract Boolean getAutoSchemaUpdate();
+
@Experimental
abstract @Nullable SerializableFunction<T, String> getDeterministicRecordIdFn();
@@ -2038,6 +2041,8 @@ public class BigQueryIO {
abstract Builder<T> setPropagateSuccessful(Boolean propagateSuccessful);
+ abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
+
@Experimental
abstract Builder<T> setDeterministicRecordIdFn(
SerializableFunction<T, String> toUniqueIdFunction);
@@ -2555,6 +2560,17 @@ public class BigQueryIO {
}
/**
+ * If true, enables automatically detecting BigQuery table schema updates. If a message with
+ * unknown fields is processed, the BigQuery table is tabled to see if the schema has been
+ * updated. This is intended for scenarios in which unknown fields are rare, otherwise calls to
+ * BigQuery will throttle the pipeline. only supported when using one of the STORAGE_API insert
+ * methods.
+ */
+ public Write<T> withAutoSchemaUpdate(boolean autoSchemaUpdate) {
+ return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build();
+ }
+
+ /*
* Provides a function which can serve as a source of deterministic unique ids for each record
* to be written, replacing the unique ids generated with the default scheme. When used with
* {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from the BigQueryIO Write by
@@ -2750,6 +2766,12 @@ public class BigQueryIO {
method);
}
+ if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
+ checkArgument(
+ !getAutoSchemaUpdate(),
+ "withAutoSchemaUpdate only supported when using storage-api writes.");
+ }
+
if (method != Write.Method.FILE_LOADS) {
// we only support writing avro for FILE_LOADS
checkArgument(
@@ -3045,7 +3067,8 @@ public class BigQueryIO {
tableRowWriterFactory.getToRowFn(),
getCreateDisposition(),
getIgnoreUnknownValues(),
- bqOptions.getSchemaUpdateRetries());
+ bqOptions.getSchemaUpdateRetries(),
+ getAutoSchemaUpdate());
}
StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index b33b220de5a..b025d01f02b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -40,6 +40,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
private final CreateDisposition createDisposition;
private final boolean ignoreUnknownValues;
private final int schemaUpdateRetries;
+ private final boolean autoSchemaUpdates;
private static final TableSchemaCache SCHEMA_CACHE =
new TableSchemaCache(Duration.standardSeconds(1));
private static final Logger LOG =
@@ -54,12 +55,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
SerializableFunction<T, TableRow> formatFunction,
CreateDisposition createDisposition,
boolean ignoreUnknownValues,
- int schemaUpdateRetries) {
+ int schemaUpdateRetries,
+ boolean autoSchemaUpdates) {
super(inner);
this.formatFunction = formatFunction;
this.createDisposition = createDisposition;
this.ignoreUnknownValues = ignoreUnknownValues;
this.schemaUpdateRetries = schemaUpdateRetries;
+ this.autoSchemaUpdates = autoSchemaUpdates;
}
static void clearSchemaCache() throws ExecutionException, InterruptedException {
@@ -180,7 +183,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
ignoreUnknownValues);
return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
} catch (SchemaTooNarrowException e) {
- if (attempt > schemaUpdateRetries) {
+ if (!autoSchemaUpdates || attempt > schemaUpdateRetries) {
throw e;
}
// The input record has fields not found in the schema, and ignoreUnknownValues=false.
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index ae4285c19bb..7f529bfa348 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1896,6 +1896,7 @@ public class BigQueryIOWriteTest implements Serializable {
.to(tableRef)
.withMethod(method)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+ .withAutoSchemaUpdate(true)
.withTestServices(fakeBqServices)
.withoutValidation());