You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/11 20:21:44 UTC
samza git commit: SAMZA-1645: A few issues found by BEAM stress test
Repository: samza
Updated Branches:
refs/heads/master fda1e37d0 -> 262941516
SAMZA-1645: A few issues found by BEAM stress test
1. Revert the priority set to intermediate streams.
2. Fix a watermark propagation condition
Author: xinyuiscool <xi...@linkedin.com>
Reviewers: Prateek M <pr...@apache.org>
Closes #469 from xinyuiscool/SAMZA-1645
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/26294151
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/26294151
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/26294151
Branch: refs/heads/master
Commit: 26294151642283c1cfb51590b51a86d2eaedd11f
Parents: fda1e37
Author: xinyuiscool <xi...@linkedin.com>
Authored: Wed Apr 11 13:21:36 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Apr 11 13:21:36 2018 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/samza/execution/StreamEdge.java | 1 -
.../main/java/org/apache/samza/operators/impl/OperatorImpl.java | 3 ++-
.../src/test/java/org/apache/samza/execution/TestStreamEdge.java | 1 -
3 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index bc08e70..62d85f1 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -125,7 +125,6 @@ public class StreamEdge {
if (isIntermediate()) {
config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
- config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
}
if (spec.isBounded()) {
config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true");
http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index f644bd9..608b2be 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -328,8 +328,9 @@ public abstract class OperatorImpl<M, RM> {
LOG.debug("Received watermark {} from {}", watermarkMessage.getTimestamp(), ssp);
watermarkStates.update(watermarkMessage, ssp);
long watermark = watermarkStates.getWatermark(ssp.getSystemStream());
- if (watermark != WatermarkStates.WATERMARK_NOT_EXIST) {
+ if (currentWatermark < watermark) {
LOG.debug("Got watermark {} from stream {}", watermark, ssp.getSystemStream());
+
if (watermarkMessage.getTaskName() != null) {
// This is the aggregation task, which already received all the watermark messages from upstream
// broadcast the watermark to all the peer partitions
http://git-wip-us.apache.org/repos/asf/samza/blob/26294151/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 424f102..0a225f5 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -80,6 +80,5 @@ public class TestStreamEdge {
streamConfig = new StreamConfig(config);
assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest");
- assertEquals(streamConfig.getPriority(spec.toSystemStream()), Integer.MAX_VALUE);
}
}