You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/07 22:26:12 UTC

[beam] branch release-2.25.0 updated: Merge pull request #13041 from lukecwik/beam10670.6

This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.25.0 by this push:
     new b0ead6e  Merge pull request #13041 from lukecwik/beam10670.6
     new c7ae63a  Merge pull request #13044 from lukecwik/beam10670.6
b0ead6e is described below

commit b0ead6ea1e913fda04b8df93b65d40f651b7f9de
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Wed Oct 7 13:49:34 2020 -0700

    Merge pull request #13041 from lukecwik/beam10670.6
    
    [BEAM-10670][BEAM-11028][BEAM-10997] Ensure that UnboundedSourceAsSDFWrapperFn returns stop() when the UnboundedSource gets to BoundedWindow.TIMESTAMP_MAX_VALUE. Also close readers when they are done.
---
 .../src/main/java/org/apache/beam/sdk/io/Read.java | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 992e3dd..f8c7151 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -532,8 +532,7 @@ public class Read {
           tracker.currentRestriction();
 
       // Advance the watermark even if zero elements may have been output.
-      watermarkEstimator.setWatermark(
-          ensureTimestampWithinBounds(currentRestriction.getWatermark()));
+      watermarkEstimator.setWatermark(currentRestriction.getWatermark());
 
       // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not
       // the initial restriction. The initial restriction would have been finalized as part of
@@ -562,7 +561,7 @@ public class Read {
       return currentElementTimestamp;
     }
 
-    private Instant ensureTimestampWithinBounds(Instant timestamp) {
+    private static Instant ensureTimestampWithinBounds(Instant timestamp) {
       if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
         timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
       } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
@@ -842,10 +841,23 @@ public class Read {
         if (currentReader == null) {
           return initialRestriction;
         }
+        Instant watermark = ensureTimestampWithinBounds(currentReader.getWatermark());
+        // We convert the reader to the empty reader to mark that we are done.
+        if (!(currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader)
+            && BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+          CheckpointT checkpointT = (CheckpointT) currentReader.getCheckpointMark();
+          try {
+            currentReader.close();
+          } catch (IOException e) {
+            LOG.warn("Failed to close UnboundedReader.", e);
+          } finally {
+            currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointT);
+          }
+        }
         return UnboundedSourceRestriction.create(
             (UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(),
             (CheckpointT) currentReader.getCheckpointMark(),
-            currentReader.getWatermark());
+            watermark);
       }
 
       @Override
@@ -866,8 +878,14 @@ public class Read {
                 UnboundedSourceRestriction.create(
                     EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE),
                 currentRestriction);
-        currentReader =
-            EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
+        try {
+          currentReader.close();
+        } catch (IOException e) {
+          LOG.warn("Failed to close UnboundedReader.", e);
+        } finally {
+          currentReader =
+              EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
+        }
         return result;
       }