You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/01/26 22:02:53 UTC

samza git commit: SAMZA-1578: Fix watermark bug found by BEAM tests

Repository: samza
Updated Branches:
  refs/heads/master 9674836f6 -> 7e68e4b10


SAMZA-1578: Fix watermark bug found by BEAM tests

The problem is getOutputWatermark() does not return the real outputWatermark. This caused problem in user override watermark function.

Author: xiliu <xi...@linkedin.com>

Reviewers: Jagadish <vj...@gmail.com>

Closes #415 from xinyuiscool/SAMZA-1578


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7e68e4b1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7e68e4b1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7e68e4b1

Branch: refs/heads/master
Commit: 7e68e4b109624e1f45d5db6325df9577a1573d55
Parents: 9674836
Author: Xinyu Liu <xi...@gmail.com>
Authored: Fri Jan 26 14:02:42 2018 -0800
Committer: xiliu <xi...@linkedin.com>
Committed: Fri Jan 26 14:02:42 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/samza/operators/impl/OperatorImpl.java  | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/7e68e4b1/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 9b2b4cf..15b763d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -356,7 +356,7 @@ public abstract class OperatorImpl<M, RM> {
         // use samza-provided watermark handling
         // default is to propagate the input watermark
         output = handleWatermark(currentWatermark, collector, coordinator);
-        outputWm = getOutputWatermark();
+        outputWm = currentWatermark;
       }
 
       if (!output.isEmpty()) {
@@ -402,14 +402,13 @@ public abstract class OperatorImpl<M, RM> {
   }
 
   /**
-   * Returns the output watermark, default is the same as input.
-   * Operators which keep track of watermark should override this to return the current watermark.
+   * Returns the output watermark,
    * @return output watermark
    */
-  protected long getOutputWatermark() {
+  final long getOutputWatermark() {
     if (usedInCurrentTask) {
       // default as input
-      return this.currentWatermark;
+      return this.outputWatermark;
     } else {
       // always emit the max to indicate no input will be emitted afterwards
       return Long.MAX_VALUE;