You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by sa...@apache.org on 2017/05/22 19:31:54 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery

Repository: apex-malhar
Updated Branches:
  refs/heads/master c830f5e4f -> c84a2c867


APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c784f4da
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c784f4da
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c784f4da

Branch: refs/heads/master
Commit: c784f4da46d1cf594aa4156135b9c196aa66d931
Parents: 3a36298
Author: chaitanya <ch...@apache.org>
Authored: Thu May 18 16:07:52 2017 +0530
Committer: chaitanya <ch...@apache.org>
Committed: Thu May 18 16:07:52 2017 +0530

----------------------------------------------------------------------
 .../kafka/KafkaSinglePortExactlyOnceOutputOperator.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c784f4da/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 75af448..23c519f 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -352,6 +352,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
 
         for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
 
+          if (consumerRecord.offset() >= currentOffset) {
+            crossedBoundary = true;
+            break;
+          }
+
           if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) {
             continue;
           }
@@ -365,10 +370,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu
             partialWindowTuples.put(value, 1);
           }
 
-          if (consumerRecord.offset() >= currentOffset) {
-            crossedBoundary = true;
-            break;
-          }
         }
 
         if (crossedBoundary) {


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2493'

Posted by sa...@apache.org.
Merge branch 'APEXMALHAR-2493'


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c84a2c86
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c84a2c86
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c84a2c86

Branch: refs/heads/master
Commit: c84a2c86796bebe6b03353c43ec36a7c76cb41c8
Parents: c830f5e c784f4d
Author: Sandesh Hegde <sa...@gmail.com>
Authored: Mon May 22 12:30:49 2017 -0700
Committer: Sandesh Hegde <sa...@gmail.com>
Committed: Mon May 22 12:30:49 2017 -0700

----------------------------------------------------------------------
 .../kafka/KafkaSinglePortExactlyOnceOutputOperator.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------