You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ahmedabu98 (via GitHub)" <gi...@apache.org> on 2023/02/10 15:42:41 UTC

[GitHub] [beam] ahmedabu98 commented on a diff in pull request #25410: Fix UpdateSchemaDestination breaking DynamicDestination in Bigquery BatchLoad

ahmedabu98 commented on code in PR #25410:
URL: https://github.com/apache/beam/pull/25410#discussion_r1102800138


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java:
##########
@@ -68,6 +68,7 @@ static String createJobIdWithDestination(
   public enum JobType {
     LOAD,
     TEMP_TABLE_LOAD,
+    ZERO_LOAD,

Review Comment:
   SCHEMA_UPDATE is more informative? wdyt



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -471,7 +495,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
         writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
 
     List<PCollectionView<?>> sideInputsForUpdateSchema =
-        Lists.newArrayList(tempLoadJobIdPrefixView);
+        Lists.newArrayList(zeroLoadJobIdPrefixView);
     sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());
 
     PCollection<TableDestination> successfulMultiPartitionWrites =

Review Comment:
   Would it make sense to have a GBK after `writeTempTables`? Re your [comment on the issue](https://github.com/apache/beam/issues/25355#issuecomment-1423372093) we'd end up with a `PCollection<KV<DestinationT, Iterable<WriteTables.Result>>>`. Also would protect writeTempTables against retry if UpdateSchemaDestination fails



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java:
##########
@@ -471,7 +495,7 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
         writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
 
     List<PCollectionView<?>> sideInputsForUpdateSchema =
-        Lists.newArrayList(tempLoadJobIdPrefixView);
+        Lists.newArrayList(zeroLoadJobIdPrefixView);
     sideInputsForUpdateSchema.addAll(dynamicDestinations.getSideInputs());
 
     PCollection<TableDestination> successfulMultiPartitionWrites =

Review Comment:
   I see this would need some changes in WriteRename as well, perhaps this could be an improvement in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org