You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/14 09:05:49 UTC
[2/3] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment
disable committing offsets for latest|earliest and store the offset for next
message
APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c464f064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c464f064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c464f064
Branch: refs/heads/devel-3
Commit: c464f064b590b786f257a765df026cb29a50c8ae
Parents: b431eb3
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Wed Jan 13 17:12:08 2016 -0800
----------------------------------------------------------------------
.../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 4 +++-
.../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java | 7 ++++---
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 4f2f704..c021c1c 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -177,6 +177,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
@Override
public void committed(long windowId)
{
+ if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+ return;
//ask kafka consumer wrapper to store the committed offsets
for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -202,7 +204,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
emitTuple(tuple.getLeft(), msg);
AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
msg.topic(), msg.partition());
- offsetTrack.put(pm, msg.offset());
+ offsetTrack.put(pm, msg.offset() + 1);
}
emitCount += count;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c464f064/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 2159e4f..57c6998 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -256,14 +256,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
return false;
}
PartitionMeta that = (PartitionMeta)o;
- return Objects.equals(cluster, that.cluster) &&
- Objects.equals(getTopicPartition(), that.getTopicPartition());
+ return Objects.equals(partitionId, that.partitionId) &&
+ Objects.equals(cluster, that.cluster) &&
+ Objects.equals(topic, that.topic);
}
@Override
public int hashCode()
{
- return Objects.hash(cluster, getTopicPartition());
+ return Objects.hash(cluster, topic, partitionId);
}
@Override