You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/14 09:05:49 UTC

[2/3] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message

APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c464f064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c464f064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c464f064

Branch: refs/heads/devel-3
Commit: c464f064b590b786f257a765df026cb29a50c8ae
Parents: b431eb3
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 13 17:12:08 2016 -0800

----------------------------------------------------------------------
 .../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java  | 4 +++-
 .../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java    | 7 ++++---
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4f2f704..c021c1c 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -177,6 +177,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
   @Override
   public void committed(long windowId)
   {
+    if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+      return;
     //ask kafka consumer wrapper to store the committed offsets
     for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
       Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -202,7 +204,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
       emitTuple(tuple.getLeft(), msg);
       AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
           msg.topic(), msg.partition());
-      offsetTrack.put(pm, msg.offset());
+      offsetTrack.put(pm, msg.offset() + 1);
     }
     emitCount += count;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 2159e4f..57c6998 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -256,14 +256,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
         return false;
       }
       PartitionMeta that = (PartitionMeta)o;
-      return Objects.equals(cluster, that.cluster) &&
-        Objects.equals(getTopicPartition(), that.getTopicPartition());
+      return Objects.equals(partitionId, that.partitionId) &&
+        Objects.equals(cluster, that.cluster) &&
+        Objects.equals(topic, that.topic);
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(cluster, getTopicPartition());
+      return Objects.hash(cluster, topic, partitionId);
     }
 
     @Override