You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/22 20:07:50 UTC

[GitHub] [beam] reuvenlax commented on a diff in pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395

reuvenlax commented on code in PR #24147:
URL: https://github.com/apache/beam/pull/24147#discussion_r1029773805


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -72,128 +65,86 @@ static void clearSchemaCache() throws ExecutionException, InterruptedException {
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws Exception {
-    return new MessageConverter<T>() {
-      @Nullable TableSchema tableSchema;
-      TableRowToStorageApiProto.SchemaInformation schemaInformation;
-      Descriptor descriptor;
-      long descriptorHash;
+    return new TableRowConverter(destination, datasetService);
+  }
 
-      {
-        tableSchema = getSchema(destination);
-        TableReference tableReference = getTable(destination).getTableReference();
-        if (tableSchema == null) {
-          // If the table already exists, then try and fetch the schema from the existing
-          // table.
-          tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-          if (tableSchema == null) {
-            if (createDisposition == CreateDisposition.CREATE_NEVER) {
-              throw new RuntimeException(
-                  "BigQuery table "
-                      + tableReference
-                      + " not found. If you wanted to "
-                      + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
-                      + "schema.");
-            } else {
-              throw new RuntimeException(
-                  "Schema must be set for table "
-                      + tableReference
-                      + " when writing TableRows using Storage API and "
-                      + "using a create disposition of CREATE_IF_NEEDED.");
-            }
-          }
-        } else {
-          // Make sure we register this schema with the cache, unless there's already a more
-          // up-to-date schema.
-          tableSchema =
-              MoreObjects.firstNonNull(
-                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
-        }
-        schemaInformation =
-            TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-        descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class TableRowConverter implements MessageConverter<T> {
+    final @Nullable TableSchema tableSchema;
+    final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+    final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        synchronized (this) {
-          return new DescriptorWrapper(descriptor, descriptorHash);
-        }
-      }
+    TableRowConverter(
+        TableSchema tableSchema,

Review Comment:
   this.tableSchema - this is the json table schema (the Beam API is written in terms of this schema, and that's usually what users give to us)
   
   this.protoTableSchema - the result of translating the json schema into the proto TableSchema
   
   this.schemaInformation - some extra information calculated about the schema to allow for easy conversion of json row -> proto. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org