You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/22 01:07:55 UTC

[GitHub] [beam] yirutang commented on a diff in pull request #24147: First step in adding schema update to Storage API sink. Refactor code #21395

yirutang commented on code in PR #24147:
URL: https://github.com/apache/beam/pull/24147#discussion_r1028652574


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinationsTableRow.java:
##########
@@ -72,128 +65,86 @@ static void clearSchemaCache() throws ExecutionException, InterruptedException {
   @Override
   public MessageConverter<T> getMessageConverter(
       DestinationT destination, DatasetService datasetService) throws Exception {
-    return new MessageConverter<T>() {
-      @Nullable TableSchema tableSchema;
-      TableRowToStorageApiProto.SchemaInformation schemaInformation;
-      Descriptor descriptor;
-      long descriptorHash;
+    return new TableRowConverter(destination, datasetService);
+  }
 
-      {
-        tableSchema = getSchema(destination);
-        TableReference tableReference = getTable(destination).getTableReference();
-        if (tableSchema == null) {
-          // If the table already exists, then try and fetch the schema from the existing
-          // table.
-          tableSchema = SCHEMA_CACHE.getSchema(tableReference, datasetService);
-          if (tableSchema == null) {
-            if (createDisposition == CreateDisposition.CREATE_NEVER) {
-              throw new RuntimeException(
-                  "BigQuery table "
-                      + tableReference
-                      + " not found. If you wanted to "
-                      + "automatically create the table, set the create disposition to CREATE_IF_NEEDED and specify a "
-                      + "schema.");
-            } else {
-              throw new RuntimeException(
-                  "Schema must be set for table "
-                      + tableReference
-                      + " when writing TableRows using Storage API and "
-                      + "using a create disposition of CREATE_IF_NEEDED.");
-            }
-          }
-        } else {
-          // Make sure we register this schema with the cache, unless there's already a more
-          // up-to-date schema.
-          tableSchema =
-              MoreObjects.firstNonNull(
-                  SCHEMA_CACHE.putSchemaIfAbsent(tableReference, tableSchema), tableSchema);
-        }
-        schemaInformation =
-            TableRowToStorageApiProto.SchemaInformation.fromTableSchema(tableSchema);
-        descriptor = TableRowToStorageApiProto.getDescriptorFromTableSchema(tableSchema);
-        descriptorHash = BigQueryUtils.hashSchemaDescriptorDeterministic(descriptor);
-      }
+  class TableRowConverter implements MessageConverter<T> {
+    final @Nullable TableSchema tableSchema;
+    final com.google.cloud.bigquery.storage.v1.TableSchema protoTableSchema;
+    final TableRowToStorageApiProto.SchemaInformation schemaInformation;
+    final Descriptor descriptor;
 
-      @Override
-      public DescriptorWrapper getSchemaDescriptor() {
-        synchronized (this) {
-          return new DescriptorWrapper(descriptor, descriptorHash);
-        }
-      }
+    TableRowConverter(
+        TableSchema tableSchema,

Review Comment:
   Why there are 3 types of schema here? What do they mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org