You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/10/07 22:26:12 UTC
[beam] branch release-2.25.0 updated: Merge pull request #13041
from lukecwik/beam10670.6
This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch release-2.25.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.25.0 by this push:
new b0ead6e Merge pull request #13041 from lukecwik/beam10670.6
new c7ae63a Merge pull request #13044 from lukecwik/beam10670.6
b0ead6e is described below
commit b0ead6ea1e913fda04b8df93b65d40f651b7f9de
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Wed Oct 7 13:49:34 2020 -0700
Merge pull request #13041 from lukecwik/beam10670.6
[BEAM-10670][BEAM-11028][BEAM-10997] Ensure that UnboundedSourceAsSDFWrapperFn returns stop() when the UnboundedSource gets to BoundedWindow.TIMESTAMP_MAX_VALUE. Also close readers when they are done.
---
.../src/main/java/org/apache/beam/sdk/io/Read.java | 30 +++++++++++++++++-----
1 file changed, 24 insertions(+), 6 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 992e3dd..f8c7151 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -532,8 +532,7 @@ public class Read {
tracker.currentRestriction();
// Advance the watermark even if zero elements may have been output.
- watermarkEstimator.setWatermark(
- ensureTimestampWithinBounds(currentRestriction.getWatermark()));
+ watermarkEstimator.setWatermark(currentRestriction.getWatermark());
// Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial and is not
// the initial restriction. The initial restriction would have been finalized as part of
@@ -562,7 +561,7 @@ public class Read {
return currentElementTimestamp;
}
- private Instant ensureTimestampWithinBounds(Instant timestamp) {
+ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
} else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
@@ -842,10 +841,23 @@ public class Read {
if (currentReader == null) {
return initialRestriction;
}
+ Instant watermark = ensureTimestampWithinBounds(currentReader.getWatermark());
+ // We convert the reader to the empty reader to mark that we are done.
+ if (!(currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader)
+ && BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
+ CheckpointT checkpointT = (CheckpointT) currentReader.getCheckpointMark();
+ try {
+ currentReader.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close UnboundedReader.", e);
+ } finally {
+ currentReader = EmptyUnboundedSource.INSTANCE.createReader(null, checkpointT);
+ }
+ }
return UnboundedSourceRestriction.create(
(UnboundedSource<OutputT, CheckpointT>) currentReader.getCurrentSource(),
(CheckpointT) currentReader.getCheckpointMark(),
- currentReader.getWatermark());
+ watermark);
}
@Override
@@ -866,8 +878,14 @@ public class Read {
UnboundedSourceRestriction.create(
EmptyUnboundedSource.INSTANCE, null, BoundedWindow.TIMESTAMP_MAX_VALUE),
currentRestriction);
- currentReader =
- EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
+ try {
+ currentReader.close();
+ } catch (IOException e) {
+ LOG.warn("Failed to close UnboundedReader.", e);
+ } finally {
+ currentReader =
+ EmptyUnboundedSource.INSTANCE.createReader(null, currentRestriction.getCheckpoint());
+ }
return result;
}