You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/11 21:21:36 UTC

[GitHub] [beam] yirutang commented on a diff in pull request #24145: Handle updates to table schema when using Storage API writes.

yirutang commented on code in PR #24145:
URL: https://github.com/apache/beam/pull/24145#discussion_r1067366424


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -364,9 +395,37 @@ void invalidateWriteStream() {
         }
       }
 
-      void addMessage(StorageApiWritePayload payload) throws Exception {
+      void addMessage(
+          StorageApiWritePayload payload,
+          OutputReceiver<BigQueryStorageApiInsertError> failedRowsReceiver)
+          throws Exception {
         maybeTickleCache();
         ByteString payloadBytes = ByteString.copyFrom(payload.getPayload());
+        if (autoUpdateSchema) {
+          if (appendClientInfo == null) {
+            appendClientInfo = getAppendClientInfo(true, null);
+          }
+          @Nullable TableRow unknownFields = payload.getUnknownFields();

Review Comment:
   if ignoreUnknownValues is false, could we avoid doing all the following (to save some process time).



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AppendClientInfo.java:
##########
@@ -28,35 +35,111 @@
  * StorageApiWritesShardedRecords} to enapsulate a destination {@link TableSchema} along with a
  * {@link BigQueryServices.StreamAppendClient} and other objects needed to write records.
  */
-class AppendClientInfo {
-  @Nullable BigQueryServices.StreamAppendClient streamAppendClient;
-  @Nullable TableSchema tableSchema;
-  Consumer<BigQueryServices.StreamAppendClient> closeAppendClient;
-  Descriptors.Descriptor descriptor;
+@AutoValue
+abstract class AppendClientInfo {
+  abstract @Nullable BigQueryServices.StreamAppendClient getStreamAppendClient();
 
-  public AppendClientInfo(
+  abstract TableSchema getTableSchema();
+
+  abstract Consumer<BigQueryServices.StreamAppendClient> getCloseAppendClient();
+
+  abstract com.google.api.services.bigquery.model.TableSchema getJsonTableSchema();
+
+  abstract TableRowToStorageApiProto.SchemaInformation getSchemaInformation();
+
+  abstract Descriptors.Descriptor getDescriptor();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setStreamAppendClient(@Nullable BigQueryServices.StreamAppendClient value);
+
+    abstract Builder setTableSchema(TableSchema value);
+
+    abstract Builder setCloseAppendClient(Consumer<BigQueryServices.StreamAppendClient> value);
+
+    abstract Builder setJsonTableSchema(com.google.api.services.bigquery.model.TableSchema value);
+
+    abstract Builder setSchemaInformation(TableRowToStorageApiProto.SchemaInformation value);
+
+    abstract Builder setDescriptor(Descriptors.Descriptor value);
+
+    abstract AppendClientInfo build();
+  };
+
+  abstract Builder toBuilder();
+
+  static AppendClientInfo of(
       TableSchema tableSchema, Consumer<BigQueryServices.StreamAppendClient> closeAppendClient)
       throws Exception {
-    this.tableSchema = tableSchema;
-    this.descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true);
-    this.closeAppendClient = closeAppendClient;
+    return new AutoValue_AppendClientInfo.Builder()
+        .setTableSchema(tableSchema)
+        .setCloseAppendClient(closeAppendClient)
+        .setJsonTableSchema(TableRowToStorageApiProto.protoSchemaToTableSchema(tableSchema))
+        .setSchemaInformation(
+            TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema))
+        .setDescriptor(TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema, true))
+        .build();
   }
 
-  public AppendClientInfo createAppendClient(
+  public AppendClientInfo withNoAppendClient() {
+    return toBuilder().setStreamAppendClient(null).build();
+  }
+
+  public AppendClientInfo withAppendClient(
       BigQueryServices.DatasetService datasetService,
       Supplier<String> getStreamName,
       boolean useConnectionPool)
       throws Exception {
-    if (streamAppendClient == null) {
-      this.streamAppendClient =
-          datasetService.getStreamAppendClient(getStreamName.get(), descriptor, useConnectionPool);
+    if (getStreamAppendClient() != null) {
+      return this;
+    } else {
+      return toBuilder()
+          .setStreamAppendClient(
+              datasetService.getStreamAppendClient(
+                  getStreamName.get(), getDescriptor(), useConnectionPool))
+          .build();
     }
-    return this;
   }
 
   public void close() {
-    if (streamAppendClient != null) {
-      closeAppendClient.accept(streamAppendClient);
+    BigQueryServices.StreamAppendClient client = getStreamAppendClient();
+    if (client != null) {
+      getCloseAppendClient().accept(client);
+    }
+  }
+
+  boolean hasSchemaChanged(TableSchema updatedTableSchema) {
+    return updatedTableSchema.hashCode() != getTableSchema().hashCode();
+  }
+
+  public ByteString encodeUnknownFields(TableRow unknown, boolean ignoreUnknownValues)

Review Comment:
   In case of ignoreUnknonwValues to be false, this will be a void operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org