You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/10/05 21:03:20 UTC

[beam] branch master updated: Merge pull request #23505: opt in for schema update. addresses #23504

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7980cb9a35c Merge pull request #23505: opt in for schema update. addresses #23504
7980cb9a35c is described below

commit 7980cb9a35cdb37ffd57a43ec9a1bfa5e204f5d7
Author: Reuven Lax <re...@google.com>
AuthorDate: Wed Oct 5 14:03:12 2022 -0700

    Merge pull request #23505: opt in for schema update. addresses #23504
---
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 25 +++++++++++++++++++++-
 .../StorageApiDynamicDestinationsTableRow.java     |  7 ++++--
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   |  1 +
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 3a6280c3038..024dcb053b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1804,6 +1804,7 @@ public class BigQueryIO {
         .setUseBeamSchema(false)
         .setAutoSharding(false)
         .setPropagateSuccessful(true)
+        .setAutoSchemaUpdate(false)
         .setDeterministicRecordIdFn(null)
         .build();
   }
@@ -1947,6 +1948,8 @@ public class BigQueryIO {
 
     abstract Boolean getPropagateSuccessful();
 
+    abstract Boolean getAutoSchemaUpdate();
+
     @Experimental
     abstract @Nullable SerializableFunction<T, String> getDeterministicRecordIdFn();
 
@@ -2038,6 +2041,8 @@ public class BigQueryIO {
 
       abstract Builder<T> setPropagateSuccessful(Boolean propagateSuccessful);
 
+      abstract Builder<T> setAutoSchemaUpdate(Boolean autoSchemaUpdate);
+
       @Experimental
       abstract Builder<T> setDeterministicRecordIdFn(
           SerializableFunction<T, String> toUniqueIdFunction);
@@ -2555,6 +2560,17 @@ public class BigQueryIO {
     }
 
     /**
+     * If true, enables automatically detecting BigQuery table schema updates. If a message with
+     * unknown fields is processed, the BigQuery table is tabled to see if the schema has been
+     * updated. This is intended for scenarios in which unknown fields are rare, otherwise calls to
+     * BigQuery will throttle the pipeline. only supported when using one of the STORAGE_API insert
+     * methods.
+     */
+    public Write<T> withAutoSchemaUpdate(boolean autoSchemaUpdate) {
+      return toBuilder().setAutoSchemaUpdate(autoSchemaUpdate).build();
+    }
+
+    /*
      * Provides a function which can serve as a source of deterministic unique ids for each record
      * to be written, replacing the unique ids generated with the default scheme. When used with
      * {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from the BigQueryIO Write by
@@ -2750,6 +2766,12 @@ public class BigQueryIO {
             method);
       }
 
+      if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
+        checkArgument(
+            !getAutoSchemaUpdate(),
+            "withAutoSchemaUpdate only supported when using storage-api writes.");
+      }
+
       if (method != Write.Method.FILE_LOADS) {
         // we only support writing avro for FILE_LOADS
         checkArgument(
@@ -3045,7 +3067,8 @@ public class BigQueryIO {
                   tableRowWriterFactory.getToRowFn(),
                   getCreateDisposition(),
                   getIgnoreUnknownValues(),
-                  bqOptions.getSchemaUpdateRetries());
+                  bqOptions.getSchemaUpdateRetries(),
+                  getAutoSchemaUpdate());
         }
 
         StorageApiLoads<DestinationT, T> storageApiLoads =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
index b33b220de5a..b025d01f02b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java
@@ -40,6 +40,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
   private final CreateDisposition createDisposition;
   private final boolean ignoreUnknownValues;
   private final int schemaUpdateRetries;
+  private final boolean autoSchemaUpdates;
   private static final TableSchemaCache SCHEMA_CACHE =
       new TableSchemaCache(Duration.standardSeconds(1));
   private static final Logger LOG =
@@ -54,12 +55,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
       SerializableFunction<T, TableRow> formatFunction,
       CreateDisposition createDisposition,
       boolean ignoreUnknownValues,
-      int schemaUpdateRetries) {
+      int schemaUpdateRetries,
+      boolean autoSchemaUpdates) {
     super(inner);
     this.formatFunction = formatFunction;
     this.createDisposition = createDisposition;
     this.ignoreUnknownValues = ignoreUnknownValues;
     this.schemaUpdateRetries = schemaUpdateRetries;
+    this.autoSchemaUpdates = autoSchemaUpdates;
   }
 
   static void clearSchemaCache() throws ExecutionException, InterruptedException {
@@ -180,7 +183,7 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
                     ignoreUnknownValues);
             return new AutoValue_StorageApiWritePayload(msg.toByteArray(), localDescriptorHash);
           } catch (SchemaTooNarrowException e) {
-            if (attempt > schemaUpdateRetries) {
+            if (!autoSchemaUpdates || attempt > schemaUpdateRetries) {
               throw e;
             }
             // The input record has fields not found in the schema, and ignoreUnknownValues=false.
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index ae4285c19bb..7f529bfa348 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -1896,6 +1896,7 @@ public class BigQueryIOWriteTest implements Serializable {
             .to(tableRef)
             .withMethod(method)
             .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+            .withAutoSchemaUpdate(true)
             .withTestServices(fakeBqServices)
             .withoutValidation());