You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/04/21 21:20:52 UTC

[beam] branch master updated: Add a BQ option for configuring buffering duration when auto-sharding is used.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e5b35  Add a BQ option for configuring buffering duration when auto-sharding is used.
     new 9182d8d  Merge pull request #14590 from [BEAM-11772, BEAM-11408] Add a BQ option for configuring buffering duration when auto-sharding is used
c1e5b35 is described below

commit c1e5b3547b0dc23b4ae4804b410be2d018781470
Author: sychen <sy...@google.com>
AuthorDate: Tue Apr 20 12:01:13 2021 -0700

    Add a BQ option for configuring buffering duration when auto-sharding is used.
---
 .../java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java   |  7 ++++++-
 .../apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 10 +++++++++-
 .../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java   |  8 ++++++++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 0d4f7b7..0e87243 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -574,6 +574,11 @@ class BatchLoads<DestinationT, ElementT>
   // of filename, file byte size, and table destination.
   PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered(
       PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
+    BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+    Duration maxBufferingDuration =
+        options.getMaxBufferingDurationMilliSec() > 0
+            ? Duration.millis(options.getMaxBufferingDurationMilliSec())
+            : FILE_TRIGGERING_BATCHING_DURATION;
     // In contrast to fixed sharding with user trigger, here we use a global window with default
     // trigger and rely on GroupIntoBatches transform to group, batch and at the same time
     // parallelize properly. We also ensure that the files are written if a threshold number of
@@ -582,7 +587,7 @@ class BatchLoads<DestinationT, ElementT>
     return input
         .apply(
             GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
-                .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION)
+                .withMaxBufferingDuration(maxBufferingDuration)
                 .withShardedKey())
         .setCoder(
             KvCoder.of(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index e1bf02f..404e740 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -61,6 +61,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** PTransform to perform batched streaming BigQuery write. */
 @SuppressWarnings({
@@ -69,6 +71,7 @@ import org.joda.time.Instant;
 class BatchedStreamingWrite<ErrorT, ElementT>
     extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
   private static final TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
+  private static final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class);
 
   private final BigQueryServices bqServices;
   private final InsertRetryPolicy retryPolicy;
@@ -293,6 +296,10 @@ class BatchedStreamingWrite<ErrorT, ElementT>
     @Override
     public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
       BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+      Duration maxBufferingDuration =
+          options.getMaxBufferingDurationMilliSec() > 0
+              ? Duration.millis(options.getMaxBufferingDurationMilliSec())
+              : BATCH_MAX_BUFFERING_DURATION;
       KvCoder<String, TableRowInfo<ElementT>> inputCoder = (KvCoder) input.getCoder();
       TableRowInfoCoder<ElementT> valueCoder =
           (TableRowInfoCoder) inputCoder.getCoderArguments().get(1);
@@ -310,7 +317,7 @@ class BatchedStreamingWrite<ErrorT, ElementT>
               .apply(
                   GroupIntoBatches.<String, TableRowInfo<ElementT>>ofSize(
                           options.getMaxStreamingRowsToBatch())
-                      .withMaxBufferingDuration(BATCH_MAX_BUFFERING_DURATION)
+                      .withMaxBufferingDuration(maxBufferingDuration)
                       .withShardedKey())
               .setCoder(
                   KvCoder.of(
@@ -347,6 +354,7 @@ class BatchedStreamingWrite<ErrorT, ElementT>
                 tableRow, context.timestamp(), window, context.pane(), failsafeTableRow));
         uniqueIds.add(row.uniqueId);
       }
+      LOG.info("Writing to BigQuery using Auto-sharding. Flushing {} rows.", tableRows.size());
       BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
       TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey());
       List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 383e79e..d350d15 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -97,4 +97,12 @@ public interface BigQueryOptions
   Integer getStorageWriteApiTriggeringFrequencySec();
 
   void setStorageWriteApiTriggeringFrequencySec(Integer value);
+
+  @Description(
+      "When auto-sharding is used, the maximum duration in milliseconds the input records are"
+          + " allowed to be buffered before being written to BigQuery.")
+  @Default.Integer(0)
+  Integer getMaxBufferingDurationMilliSec();
+
+  void setMaxBufferingDurationMilliSec(Integer value);
 }