You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2021/04/21 21:20:52 UTC
[beam] branch master updated: Add a BQ option for configuring
buffering duration when auto-sharding is used.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c1e5b35 Add a BQ option for configuring buffering duration when auto-sharding is used.
new 9182d8d Merge pull request #14590 from [BEAM-11772, BEAM-11408] Add a BQ option for configuring buffering duration when auto-sharding is used
c1e5b35 is described below
commit c1e5b3547b0dc23b4ae4804b410be2d018781470
Author: sychen <sy...@google.com>
AuthorDate: Tue Apr 20 12:01:13 2021 -0700
Add a BQ option for configuring buffering duration when auto-sharding is used.
---
.../java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java | 7 ++++++-
.../apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 10 +++++++++-
.../org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java | 8 ++++++++
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 0d4f7b7..0e87243 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -574,6 +574,11 @@ class BatchLoads<DestinationT, ElementT>
// of filename, file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ Duration maxBufferingDuration =
+ options.getMaxBufferingDurationMilliSec() > 0
+ ? Duration.millis(options.getMaxBufferingDurationMilliSec())
+ : FILE_TRIGGERING_BATCHING_DURATION;
// In contrast to fixed sharding with user trigger, here we use a global window with default
// trigger and rely on GroupIntoBatches transform to group, batch and at the same time
// parallelize properly. We also ensure that the files are written if a threshold number of
@@ -582,7 +587,7 @@ class BatchLoads<DestinationT, ElementT>
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
- .withMaxBufferingDuration(FILE_TRIGGERING_BATCHING_DURATION)
+ .withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
index e1bf02f..404e740 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java
@@ -61,6 +61,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** PTransform to perform batched streaming BigQuery write. */
@SuppressWarnings({
@@ -69,6 +71,7 @@ import org.joda.time.Instant;
class BatchedStreamingWrite<ErrorT, ElementT>
extends PTransform<PCollection<KV<String, TableRowInfo<ElementT>>>, PCollection<ErrorT>> {
private static final TupleTag<Void> mainOutputTag = new TupleTag<>("mainOutput");
+ private static final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class);
private final BigQueryServices bqServices;
private final InsertRetryPolicy retryPolicy;
@@ -293,6 +296,10 @@ class BatchedStreamingWrite<ErrorT, ElementT>
@Override
public PCollection<ErrorT> expand(PCollection<KV<String, TableRowInfo<ElementT>>> input) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+ Duration maxBufferingDuration =
+ options.getMaxBufferingDurationMilliSec() > 0
+ ? Duration.millis(options.getMaxBufferingDurationMilliSec())
+ : BATCH_MAX_BUFFERING_DURATION;
KvCoder<String, TableRowInfo<ElementT>> inputCoder = (KvCoder) input.getCoder();
TableRowInfoCoder<ElementT> valueCoder =
(TableRowInfoCoder) inputCoder.getCoderArguments().get(1);
@@ -310,7 +317,7 @@ class BatchedStreamingWrite<ErrorT, ElementT>
.apply(
GroupIntoBatches.<String, TableRowInfo<ElementT>>ofSize(
options.getMaxStreamingRowsToBatch())
- .withMaxBufferingDuration(BATCH_MAX_BUFFERING_DURATION)
+ .withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
@@ -347,6 +354,7 @@ class BatchedStreamingWrite<ErrorT, ElementT>
tableRow, context.timestamp(), window, context.pane(), failsafeTableRow));
uniqueIds.add(row.uniqueId);
}
+ LOG.info("Writing to BigQuery using Auto-sharding. Flushing {} rows.", tableRows.size());
BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey());
List<ValueInSingleWindow<ErrorT>> failedInserts = Lists.newArrayList();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
index 383e79e..d350d15 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
@@ -97,4 +97,12 @@ public interface BigQueryOptions
Integer getStorageWriteApiTriggeringFrequencySec();
void setStorageWriteApiTriggeringFrequencySec(Integer value);
+
+ @Description(
+ "When auto-sharding is used, the maximum duration in milliseconds the input records are"
+ + " allowed to be buffered before being written to BigQuery.")
+ @Default.Integer(0)
+ Integer getMaxBufferingDurationMilliSec();
+
+ void setMaxBufferingDurationMilliSec(Integer value);
}