You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ki...@apache.org on 2022/08/04 19:12:59 UTC

[beam] branch release-2.41.0 updated: Merge pull request #22347: [22188]Set allowed timestamp skew

This is an automated email from the ASF dual-hosted git repository.

kileysok pushed a commit to branch release-2.41.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.41.0 by this push:
     new c5b630a8b97 Merge pull request #22347: [22188]Set allowed timestamp skew
     new 5d5bac79c13 Merge pull request #22589 from reuvenlax/release-2.41.0
c5b630a8b97 is described below

commit c5b630a8b973bd07b9305487173e011be09cd6a8
Author: Reuven Lax <re...@google.com>
AuthorDate: Thu Aug 4 10:32:20 2022 -0700

    Merge pull request #22347: [22188]Set allowed timestamp skew
---
 .../sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java    | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
index 1ccd527497b..afecc966955 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java
@@ -553,6 +553,11 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
         OutputReceiver<KV<String, Operation>> o,
         BoundedWindow window) {
       // Stream is idle - clear it.
+      // Note: this is best effort. We are explicitly emiting a timestamp that is before
+      // the default output timestamp, which means that in some cases (usually when draining
+      // a pipeline) this finalize element will be dropped as late. This is usually ok as
+      // BigQuery will eventually garbage collect the stream. We attempt to finalize idle streams
+      // merely to remove the pressure of large numbers of orphaned streams from BigQuery.
       finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
       streamsIdle.inc();
     }
@@ -567,5 +572,10 @@ public class StorageApiWritesShardedRecords<DestinationT extends @NonNull Object
       // streams so that they are not leaked.
       finalizeStream(streamName, streamOffset, o, window.maxTimestamp());
     }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    }
   }
 }