You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/25 21:00:27 UTC

[GitHub] [beam] ihji commented on a change in pull request #15013: [BEAM-12497] BigQueryIO returns successful rows for streaming inserts

ihji commented on a change in pull request #15013:
URL: https://github.com/apache/beam/pull/15013#discussion_r659015077



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -306,10 +311,13 @@ public void finishBundle(FinishBundleContext context) throws Exception {
                       ShardedKey.Coder.of(StringUtf8Coder.of()), IterableCoder.of(valueCoder)))
               .apply(
                   ParDo.of(new InsertBatchedElements())
-                      .withOutputTags(mainOutputTag, TupleTagList.of(failedOutputTag)));
+                      .withOutputTags(
+                          mainOutputTag,
+                          TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
       PCollection<ErrorT> failedInserts = result.get(failedOutputTag);
       failedInserts.setCoder(failedOutputCoder);

Review comment:
       ditto.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java.bak
##########
@@ -0,0 +1,1333 @@
+/*

Review comment:
       please remove a backup file.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -191,23 +192,25 @@ private BatchedStreamingWrite(
   }
 
   @Override
-  public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
+  public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
     return batchViaStateful
         ? input.apply(new ViaStateful())
         : input.apply(new ViaBundleFinalization());
   }
 
   private class ViaBundleFinalization
-      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
+      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollectionTuple> {
     @Override
-    public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
+    public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
       PCollectionTuple result =
           input.apply(
               ParDo.of(new BatchAndInsertElements())
-                  .withOutputTags(mainOutputTag, TupleTagList.of(failedOutputTag)));
+                  .withOutputTags(
+                      mainOutputTag, TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));

Review comment:
       It's interesting that we already produced multiple pcollections but just returned failed inserts.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
##########
@@ -334,44 +347,47 @@ public WriteResult expand(PCollection<KV<TableDestination, ElementT>> input) {
                       ShardedKeyCoder.of(StringUtf8Coder.of()),
                       TableRowInfoCoder.of(elementCoder)));
 
-      return shardedTagged
-          .apply(Reshuffle.of())
-          // Put in the global window to ensure that DynamicDestinations side inputs are accessed
-          // correctly.
-          .apply(
-              "GlobalWindow",
-              Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes())
-          .apply(
-              "StripShardId",
-              MapElements.via(
-                  new SimpleFunction<
-                      KV<ShardedKey<String>, TableRowInfo<ElementT>>,
-                      KV<String, TableRowInfo<ElementT>>>() {
-                    @Override
-                    public KV<String, TableRowInfo<ElementT>> apply(
-                        KV<ShardedKey<String>, TableRowInfo<ElementT>> input) {
-                      return KV.of(input.getKey().getKey(), input.getValue());
-                    }
-                  }))
-          .setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(elementCoder)))
-          // Also batch the TableRows in a best effort manner via bundle finalization before
-          // inserting to BigQuery.
-          .apply(
-              "StreamingWrite",
-              new BatchedStreamingWrite<>(
-                      bigQueryServices,
-                      retryPolicy,
-                      failedInsertsTag,
-                      coder,
-                      errorContainer,
-                      skipInvalidRows,
-                      ignoreUnknownValues,
-                      ignoreInsertIds,
-                      toTableRow,
-                      toFailsafeTableRow)
-                  .viaDoFnFinalization());
+      PCollectionTuple result =

Review comment:
       ditto.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java
##########
@@ -305,20 +316,22 @@ public WriteResult expand(PCollection<KV<TableDestination, ElementT>> input) {
 
       // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey transform which groups and at
       // the same time batches the TableRows to be inserted to BigQuery.
-      return unshardedTagged.apply(
-          "StreamingWrite",
-          new BatchedStreamingWrite<>(
-                  bigQueryServices,
-                  retryPolicy,
-                  failedInsertsTag,
-                  coder,
-                  errorContainer,
-                  skipInvalidRows,
-                  ignoreUnknownValues,
-                  ignoreInsertIds,
-                  toTableRow,
-                  toFailsafeTableRow)
-              .viaStateful());
+      PCollectionTuple result =

Review comment:
       looks like there's no change except introducing a temporary variable `result`.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -828,9 +855,41 @@ public void deleteDataset(String projectId, String datasetId)
         List<ValueInSingleWindow<T>> failedInserts,
         ErrorContainer<T> errorContainer,
         boolean skipInvalidRows,
-        boolean ignoreUnkownValues,
+        boolean ignoreUnknownValues,
         boolean ignoreInsertIds)
         throws IOException, InterruptedException {
+      return insertAll(

Review comment:
       Looks like now we have four variants of `insertAll`:
   ```
     a) public insertAll (with successfulRows) 
     b) public insertAll (without successfulRows)
     c) protected insertAll (with successfulRows)
     d) protected insertAll (without successfulRows)
   // b calls a, d calls c, a calls c
   ```
   If these methods are mostly for internal use, I would prefer to have just:
   ```
     a) public insertAll (with successfulRows)   
     b) protected insertAll (with successfulRows)
   // a calls b
   ```
   and explicitly put `null` for all existing call-sites of `a` and `b`. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
##########
@@ -191,23 +192,25 @@ private BatchedStreamingWrite(
   }
 
   @Override
-  public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
+  public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
     return batchViaStateful
         ? input.apply(new ViaStateful())
         : input.apply(new ViaBundleFinalization());
   }
 
   private class ViaBundleFinalization
-      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
+      extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollectionTuple> {
     @Override
-    public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
+    public PCollectionTuple expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
       PCollectionTuple result =
           input.apply(
               ParDo.of(new BatchAndInsertElements())
-                  .withOutputTags(mainOutputTag, TupleTagList.of(failedOutputTag)));
+                  .withOutputTags(
+                      mainOutputTag, TupleTagList.of(failedOutputTag).and(SUCCESSFUL_ROWS_TAG)));
       PCollection<ErrorT> failedInserts = result.get(failedOutputTag);
       failedInserts.setCoder(failedOutputCoder);

Review comment:
       Looks like this can also be a one liner like just below.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
##########
@@ -844,6 +903,7 @@ public void deleteDataset(String projectId, String datasetId)
                 + "as many elements as rowList");
       }
 
+      final Set<Integer> failedIndices = new HashSet<>();

Review comment:
       How big `failedIndices` set could be? Any implication on performance impact? Probably okay since `insertAll` is only for streaming insert.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org