You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ki...@apache.org on 2022/08/04 19:12:59 UTC
[beam] branch release-2.41.0 updated: Merge pull request #22347: [22188]Set allowed timestamp skew
This is an automated email from the ASF dual-hosted git repository.
kileysok pushed a commit to branch release-2.41.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.41.0 by this push:
new c5b630a8b97 Merge pull request #22347: [22188]Set allowed timestamp skew
new 5d5bac79c13 Merge pull request #22589 from reuvenlax/release-2.41.0
c5b630a8b97 is described below
commit c5b630a8b973bd07b9305487173e011be09cd6a8
Author: Reuven Lax <re...@google.com>
AuthorDate: Thu Aug 4 10:32:20 2022 -0700
Merge pull request #22347: [22188]Set allowed timestamp skew
---
.../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 1ccd527497b..afecc966955 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -553,6 +553,11 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
OutputReceiver<KV<String, Operation>> o,
BoundedWindow window) {
// Stream is idle - clear it.
+ // Note: this is best effort. We are explicitly emiting a timestamp that is before
+ // the default output timestamp, which means that in some cases (usually when draining
+ // a pipeline) this finalize element will be dropped as late. This is usually ok as
+ // BigQuery will eventually garbage collect the stream. We attempt to finalize idle streams
+ // merely to remove the pressure of large numbers of orphaned streams from BigQuery.
finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
streamsIdle.inc();
}
@@ -567,5 +572,10 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
// streams so that they are not leaked.
finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
}
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ }
}
}