You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jo...@apache.org on 2022/08/26 16:25:22 UTC

[beam] branch master updated: Pass user specified destination type to UpdateSchemaDestination (#22624) fixing #22543

This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3217017aee8 Pass user specified destination type to UpdateSchemaDestination  (#22624) fixing #22543
3217017aee8 is described below

commit 3217017aee8e4a0a95f0153a541c6626ca1c1d4a
Author: Ahmed Abualsaud <65...@users.noreply.github.com>
AuthorDate: Fri Aug 26 12:25:15 2022 -0400

    Pass user specified destination type to UpdateSchemaDestination  (#22624) fixing #22543
    
    * keeping hold of user specified dynamic destination type to be able to use it in UpdateSchemaDestinations
    
    * fix for testWriteTables
    
    * cleanup and support default project when not included in table ref
    
    * allow side inputs called from getTable()
    
    * style fixes
---
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java       | 48 ++++++++++++++++------
 .../io/gcp/bigquery/UpdateSchemaDestination.java   | 37 +++++++++++++----
 .../beam/sdk/io/gcp/bigquery/WriteTables.java      | 15 ++++---
 .../sdk/io/gcp/bigquery/BigQueryIOWriteTest.java   | 22 ++++++++--
 4 files changed, 94 insertions(+), 28 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 928f4a253c6..82424412ffc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLoc
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
+import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import java.util.Collections;
 import java.util.List;
@@ -354,7 +355,7 @@ class BatchLoads<DestinationT, ElementT>
                             rowWriterFactory))
                     .withSideInputs(tempFilePrefixView)
                     .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-    PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
+    PCollection<KV<DestinationT, WriteTables.Result>> tempTables =
         writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
 
     List<PCollectionView<?>> sideInputsForUpdateSchema =
@@ -366,7 +367,7 @@ class BatchLoads<DestinationT, ElementT>
             // Now that the load job has happened, we want the rename to happen immediately.
             .apply(
                 "Window Into Global Windows",
-                Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows())
+                Window.<KV<DestinationT, WriteTables.Result>>into(new GlobalWindows())
                     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
             .apply("Add Void Key", WithKeys.of((Void) null))
             .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
@@ -374,7 +375,7 @@ class BatchLoads<DestinationT, ElementT>
             .apply("Extract Values", Values.create())
             .apply(
                 ParDo.of(
-                        new UpdateSchemaDestination(
+                        new UpdateSchemaDestination<DestinationT>(
                             bigQueryServices,
                             tempLoadJobIdPrefixView,
                             loadJobProjectId,
@@ -470,7 +471,7 @@ class BatchLoads<DestinationT, ElementT>
             .apply("ReifyRenameInput", new ReifyAsIterable<>())
             .apply(
                 ParDo.of(
-                        new UpdateSchemaDestination(
+                        new UpdateSchemaDestination<DestinationT>(
                             bigQueryServices,
                             tempLoadJobIdPrefixView,
                             loadJobProjectId,
@@ -702,7 +703,7 @@ class BatchLoads<DestinationT, ElementT>
   }
 
   // Take in a list of files and write them to temporary tables.
-  private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
+  private PCollection<KV<DestinationT, WriteTables.Result>> writeTempTables(
       PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
       PCollectionView<String> jobIdTokenView) {
     List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
@@ -713,9 +714,6 @@ class BatchLoads<DestinationT, ElementT>
             ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
             WritePartition.ResultCoder.INSTANCE);
 
-    Coder<TableDestination> tableDestinationCoder =
-        clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
-
     // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or
     // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then
     // the import needs to be split into multiple partitions, and those partitions will be
@@ -746,7 +744,7 @@ class BatchLoads<DestinationT, ElementT>
                 // https://github.com/apache/beam/issues/21105 for additional details.
                 schemaUpdateOptions,
                 tempDataset))
-        .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
+        .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE));
   }
 
   // In the case where the files fit into a single load job, there's no need to write temporary
@@ -765,7 +763,7 @@ class BatchLoads<DestinationT, ElementT>
             ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
             WritePartition.ResultCoder.INSTANCE);
     // Write single partition to final table
-    PCollection<KV<TableDestination, WriteTables.Result>> successfulWrites =
+    PCollection<KV<DestinationT, WriteTables.Result>> successfulWrites =
         input
             .setCoder(partitionsCoder)
             // Reshuffle will distribute this among multiple workers, and also guard against
@@ -789,9 +787,35 @@ class BatchLoads<DestinationT, ElementT>
                     useAvroLogicalTypes,
                     schemaUpdateOptions,
                     null))
-            .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
+            .setCoder(KvCoder.of(destinationCoder, WriteTables.ResultCoder.INSTANCE));
 
-    return successfulWrites.apply(Keys.create());
+    BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    String defaultProjectId =
+        options.getBigQueryProject() == null ? options.getProject() : options.getBigQueryProject();
+
+    return successfulWrites
+        .apply(Keys.create())
+        .apply(
+            "Convert to TableDestinations",
+            ParDo.of(
+                    new DoFn<DestinationT, TableDestination>() {
+                      @ProcessElement
+                      public void processElement(ProcessContext c) {
+                        dynamicDestinations.setSideInputAccessorFromProcessContext(c);
+                        TableDestination tableDestination =
+                            dynamicDestinations.getTable(c.element());
+                        TableReference tableReference = tableDestination.getTableReference();
+
+                        // get project ID from options if it's not included in the table reference
+                        if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+                          tableReference.setProjectId(defaultProjectId);
+                          tableDestination = tableDestination.withTableReference(tableReference);
+                        }
+                        c.output(tableDestination);
+                      }
+                    })
+                .withSideInputs(sideInputs))
+        .setCoder(tableDestinationCoder);
   }
 
   private WriteResult writeResult(Pipeline p, PCollection<TableDestination> successfulWrites) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java
index a7aa864d578..2513911087e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/UpdateSchemaDestination.java
@@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -38,14 +39,15 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"nullness", "rawtypes"})
-public class UpdateSchemaDestination
+public class UpdateSchemaDestination<DestinationT>
     extends DoFn<
-        Iterable<KV<TableDestination, WriteTables.Result>>,
+        Iterable<KV<DestinationT, WriteTables.Result>>,
         Iterable<KV<TableDestination, WriteTables.Result>>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(UpdateSchemaDestination.class);
@@ -104,21 +106,37 @@ public class UpdateSchemaDestination
     pendingJobs.clear();
   }
 
+  TableDestination getTableWithDefaultProject(DestinationT destination, BigQueryOptions options) {
+    TableDestination tableDestination = dynamicDestinations.getTable(destination);
+    TableReference tableReference = tableDestination.getTableReference();
+
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      tableReference.setProjectId(
+          options.getBigQueryProject() == null
+              ? options.getProject()
+              : options.getBigQueryProject());
+      tableDestination = tableDestination.withTableReference(tableReference);
+    }
+
+    return tableDestination;
+  }
+
   @ProcessElement
   public void processElement(
-      @Element Iterable<KV<TableDestination, WriteTables.Result>> element,
+      @Element Iterable<KV<DestinationT, WriteTables.Result>> element,
       ProcessContext context,
       BoundedWindow window)
       throws IOException {
-    Object destination = null;
-    for (KV<TableDestination, WriteTables.Result> entry : element) {
+    DestinationT destination = null;
+    BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+    for (KV<DestinationT, WriteTables.Result> entry : element) {
       destination = entry.getKey();
       if (destination != null) {
         break;
       }
     }
     if (destination != null) {
-      TableDestination tableDestination = dynamicDestinations.getTable(destination);
+      TableDestination tableDestination = getTableWithDefaultProject(destination, options);
       TableSchema schema = dynamicDestinations.getSchema(destination);
       TableReference tableReference = tableDestination.getTableReference();
       String jobIdPrefix =
@@ -143,8 +161,13 @@ public class UpdateSchemaDestination
       if (updateSchemaDestinationJob != null) {
         pendingJobs.add(new PendingJobData(updateSchemaDestinationJob, tableDestination, window));
       }
-      context.output(element);
     }
+    List<KV<TableDestination, WriteTables.Result>> tableDestinations = new ArrayList<>();
+    for (KV<DestinationT, WriteTables.Result> entry : element) {
+      tableDestinations.add(
+          KV.of(getTableWithDefaultProject(destination, options), entry.getValue()));
+    }
+    context.output(tableDestinations);
   }
 
   @Teardown
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index ce725c989b7..fadbca0280c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -97,7 +97,7 @@ import org.slf4j.LoggerFactory;
 class WriteTables<DestinationT extends @NonNull Object>
     extends PTransform<
         PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>>,
-        PCollection<KV<TableDestination, WriteTables.Result>>> {
+        PCollection<KV<DestinationT, WriteTables.Result>>> {
   @AutoValue
   abstract static class Result {
     abstract String getTableName();
@@ -135,7 +135,7 @@ class WriteTables<DestinationT extends @NonNull Object>
   private final Set<SchemaUpdateOption> schemaUpdateOptions;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final List<PCollectionView<?>> sideInputs;
-  private final TupleTag<KV<TableDestination, WriteTables.Result>> mainOutputTag;
+  private final TupleTag<KV<DestinationT, WriteTables.Result>> mainOutputTag;
   private final TupleTag<String> temporaryFilesTag;
   private final @Nullable ValueProvider<String> loadJobProjectId;
   private final int maxRetryJobs;
@@ -148,8 +148,7 @@ class WriteTables<DestinationT extends @NonNull Object>
   private final @Nullable String tempDataset;
 
   private class WriteTablesDoFn
-      extends DoFn<
-          KV<ShardedKey<DestinationT>, WritePartition.Result>, KV<TableDestination, Result>> {
+      extends DoFn<KV<ShardedKey<DestinationT>, WritePartition.Result>, KV<DestinationT, Result>> {
 
     private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
 
@@ -160,6 +159,7 @@ class WriteTables<DestinationT extends @NonNull Object>
       final List<String> partitionFiles;
       final TableDestination tableDestination;
       final TableReference tableReference;
+      final DestinationT destinationT;
       final boolean isFirstPane;
 
       public PendingJobData(
@@ -168,12 +168,14 @@ class WriteTables<DestinationT extends @NonNull Object>
           List<String> partitionFiles,
           TableDestination tableDestination,
           TableReference tableReference,
+          DestinationT destinationT,
           boolean isFirstPane) {
         this.window = window;
         this.retryJob = retryJob;
         this.partitionFiles = partitionFiles;
         this.tableDestination = tableDestination;
         this.tableReference = tableReference;
+        this.destinationT = destinationT;
         this.isFirstPane = isFirstPane;
       }
     }
@@ -292,6 +294,7 @@ class WriteTables<DestinationT extends @NonNull Object>
               partitionFiles,
               tableDestination,
               tableReference,
+              destination,
               element.getValue().isFirstPane()));
     }
 
@@ -359,7 +362,7 @@ class WriteTables<DestinationT extends @NonNull Object>
                             pendingJob.isFirstPane);
                     c.output(
                         mainOutputTag,
-                        KV.of(pendingJob.tableDestination, result),
+                        KV.of(pendingJob.destinationT, result),
                         pendingJob.window.maxTimestamp(),
                         pendingJob.window);
                     for (String file : pendingJob.partitionFiles) {
@@ -423,7 +426,7 @@ class WriteTables<DestinationT extends @NonNull Object>
   }
 
   @Override
-  public PCollection<KV<TableDestination, Result>> expand(
+  public PCollection<KV<DestinationT, Result>> expand(
       PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input) {
     PCollectionTuple writeTablesOutputs =
         input.apply(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index 8c8a93a40e5..bd88cd98859 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -104,8 +104,10 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Distinct;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -2209,6 +2211,8 @@ public class BigQueryIOWriteTest implements Serializable {
         p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
     List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);
 
+    DynamicDestinations<String, String> dynamicDestinations = new IdentityDynamicTables();
+
     fakeJobService.setNumFailuresExpected(3);
     WriteTables<String> writeTables =
         new WriteTables<>(
@@ -2218,7 +2222,7 @@ public class BigQueryIOWriteTest implements Serializable {
             BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
             BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED,
             sideInputs,
-            new IdentityDynamicTables(),
+            dynamicDestinations,
             null,
             4,
             false,
@@ -2231,12 +2235,24 @@ public class BigQueryIOWriteTest implements Serializable {
     PCollection<KV<TableDestination, WriteTables.Result>> writeTablesOutput =
         writeTablesInput
             .apply(writeTables)
-            .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE));
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE))
+            .apply(
+                ParDo.of(
+                    new DoFn<
+                        KV<String, WriteTables.Result>,
+                        KV<TableDestination, WriteTables.Result>>() {
+                      @ProcessElement
+                      public void processElement(
+                          @Element KV<String, WriteTables.Result> e,
+                          OutputReceiver<KV<TableDestination, WriteTables.Result>> o) {
+                        o.output(KV.of(dynamicDestinations.getTable(e.getKey()), e.getValue()));
+                      }
+                    }));
 
     PAssert.thatMultimap(writeTablesOutput)
         .satisfies(
             input -> {
-              assertEquals(input.keySet(), expectedTempTables.keySet());
+              assertEquals(expectedTempTables.keySet(), input.keySet());
               for (Map.Entry<TableDestination, Iterable<WriteTables.Result>> entry :
                   input.entrySet()) {
                 Iterable<String> tableNames =