You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/11 20:21:44 UTC

samza git commit: SAMZA-1645: A few issues found by BEAM stress test

Repository: samza
Updated Branches:
  refs/heads/master fda1e37d0 -> 262941516


SAMZA-1645: A few issues found by BEAM stress test

1. Revert the priority set to intermediate streams.
2. Fix a watermark propagation condition

Author: xinyuiscool <xi...@linkedin.com>

Reviewers: Prateek M <pr...@apache.org>

Closes #469 from xinyuiscool/SAMZA-1645


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/26294151
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/26294151
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/26294151

Branch: refs/heads/master
Commit: 26294151642283c1cfb51590b51a86d2eaedd11f
Parents: fda1e37
Author: xinyuiscool <xi...@linkedin.com>
Authored: Wed Apr 11 13:21:36 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Apr 11 13:21:36 2018 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/execution/StreamEdge.java      | 1 -
 .../main/java/org/apache/samza/operators/impl/OperatorImpl.java   | 3 ++-
 .../src/test/java/org/apache/samza/execution/TestStreamEdge.java  | 1 -
 3 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index bc08e70..62d85f1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -125,7 +125,6 @@ public class StreamEdge {
     if (isIntermediate()) {
       config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
       config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
-      config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
     if (spec.isBounded()) {
       config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true");

http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index f644bd9..608b2be 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -328,8 +328,9 @@ public abstract class OperatorImpl<M, RM> {
     LOG.debug("Received watermark {} from {}", watermarkMessage.getTimestamp(), ssp);
     watermarkStates.update(watermarkMessage, ssp);
     long watermark = watermarkStates.getWatermark(ssp.getSystemStream());
-    if (watermark != WatermarkStates.WATERMARK_NOT_EXIST) {
+    if (currentWatermark < watermark) {
       LOG.debug("Got watermark {} from stream {}", watermark, ssp.getSystemStream());
+
       if (watermarkMessage.getTaskName() != null) {
         // This is the aggregation task, which already received all the watermark messages from upstream
         // broadcast the watermark to all the peer partitions

http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 424f102..0a225f5 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -80,6 +80,5 @@ public class TestStreamEdge {
     streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
     assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest");
-    assertEquals(streamConfig.getPriority(spec.toSystemStream()), Integer.MAX_VALUE);
   }
 }