You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/20 09:56:50 UTC

[4/7] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message

APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ca1c7e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ca1c7e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ca1c7e60

Branch: refs/heads/release-3.3
Commit: ca1c7e60f91779d7c19ecc70baa997d7ae2a2c47
Parents: be71996
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Feb 20 00:17:42 2016 -0800

----------------------------------------------------------------------
 .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java  | 4 +++-
 .../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java    | 7 ++++---
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index fbff2e7..512f058 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -179,6 +179,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   @Override
   public void committed(long windowId)
   {
+    if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+      return;
     //ask kafka consumer wrapper to store the committed offsets
     for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
       Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -204,7 +206,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
       emitTuple(tuple.getLeft(), msg);
       AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
           msg.topic(), msg.partition());
-      offsetTrack.put(pm, msg.offset());
+      offsetTrack.put(pm, msg.offset() + 1);
     }
     emitCount += count;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c708145..53bbd2a 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -258,14 +258,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
         return false;
       }
       PartitionMeta that = (PartitionMeta)o;
-      return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(getTopicPartition(), that.getTopicPartition());
+      return Objects.equals(partitionId, that.partitionId) &&
+        Objects.equals(cluster, that.cluster) &&
+        Objects.equals(topic, that.topic);
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, getTopicPartition());
+      return Objects.hash(cluster, topic, partitionId);
     }
 
     @Override