You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/26 22:02:53 UTC
samza git commit: SAMZA-1578: Fix watermark bug found by BEAM tests
Repository: samza
Updated Branches:
refs/heads/master 9674836f6 -> 7e68e4b10
SAMZA-1578: Fix watermark bug found by BEAM tests
The problem is getOutputWatermark() does not return the real outputWatermark. This caused problem in user override watermark function.
Author: xiliu <xi...@linkedin.com>
Reviewers: Jagadish <vj...@gmail.com>
Closes #415 from xinyuiscool/SAMZA-1578
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7e68e4b1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7e68e4b1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7e68e4b1
Branch: refs/heads/master
Commit: 7e68e4b109624e1f45d5db6325df9577a1573d55
Parents: 9674836
Author: Xinyu Liu <xi...@gmail.com>
Authored: Fri Jan 26 14:02:42 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Jan 26 14:02:42 2018 -0800
----------------------------------------------------------------------
.../java/org/apache/samza/operators/impl/OperatorImpl.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7e68e4b1/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 9b2b4cf..15b763d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -356,7 +356,7 @@ public abstract class OperatorImpl<M, RM> {
// use samza-provided watermark handling
// default is to propagate the input watermark
output = handleWatermark(currentWatermark, collector, coordinator);
- outputWm = getOutputWatermark();
+ outputWm = currentWatermark;
}
if (!output.isEmpty()) {
@@ -402,14 +402,13 @@ public abstract class OperatorImpl<M, RM> {
}
/**
- * Returns the output watermark, default is the same as input.
- * Operators which keep track of watermark should override this to return the current watermark.
+ * Returns the output watermark,
* @return output watermark
*/
- protected long getOutputWatermark() {
+ final long getOutputWatermark() {
if (usedInCurrentTask) {
// default as input
- return this.currentWatermark;
+ return this.outputWatermark;
} else {
// always emit the max to indicate no input will be emitted afterwards
return Long.MAX_VALUE;