You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2017/05/22 19:31:54 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2493 Fixed the issue of
KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during
recovery
Repository: apex-malhar
Updated Branches:
refs/heads/master c830f5e4f -> c84a2c867
APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c784f4da
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c784f4da
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c784f4da
Branch: refs/heads/master
Commit: c784f4da46d1cf594aa4156135b9c196aa66d931
Parents: 3a36298
Author: chaitanya <ch...@apache.org>
Authored: Thu May 18 16:07:52 2017 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Thu May 18 16:07:52 2017 +0530
----------------------------------------------------------------------
.../kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c784f4da/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 75af448..23c519f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -352,6 +352,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
+ if (consumerRecord.offset() >= currentOffset) {
+ crossedBoundary = true;
+ break;
+ }
+
if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
continue;
}
@@ -365,10 +370,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
partialWindowTuples.put(value, 1);
}
- if (consumerRecord.offset() >= currentOffset) {
- crossedBoundary = true;
- break;
- }
}
if (crossedBoundary) {
[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2493'
Posted by sa...@apache.org.
Merge branch 'APEXMALHAR-2493'
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c84a2c86
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c84a2c86
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c84a2c86
Branch: refs/heads/master
Commit: c84a2c86796bebe6b03353c43ec36a7c76cb41c8
Parents: c830f5e c784f4d
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Mon May 22 12:30:49 2017 -0700
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Mon May 22 12:30:49 2017 -0700
----------------------------------------------------------------------
.../kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------