You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/20 09:56:50 UTC
[4/7] incubator-apex-malhar git commit: APEXMALHAR-1973 #comment
disable committing offsets for latest|earliest and store the offset for next
message
APEXMALHAR-1973 #comment disable committing offsets for latest|earliest and store the offset for next message
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/ca1c7e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/ca1c7e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/ca1c7e60
Branch: refs/heads/release-3.3
Commit: ca1c7e60f91779d7c19ecc70baa997d7ae2a2c47
Parents: be71996
Author: Siyuan Hua <hs...@apache.org>
Authored: Wed Jan 13 17:07:18 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Sat Feb 20 00:17:42 2016 -0800
----------------------------------------------------------------------
.../apache/apex/malhar/kafka/AbstractKafkaInputOperator.java | 4 +++-
.../apache/apex/malhar/kafka/AbstractKafkaPartitioner.java | 7 ++++---
2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index fbff2e7..512f058 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -179,6 +179,8 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
@Override
public void committed(long windowId)
{
+ if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+ return;
//ask kafka consumer wrapper to store the committed offsets
for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter = offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -204,7 +206,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator, Opera
emitTuple(tuple.getLeft(), msg);
AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
msg.topic(), msg.partition());
- offsetTrack.put(pm, msg.offset());
+ offsetTrack.put(pm, msg.offset() + 1);
}
emitCount += count;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/ca1c7e60/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index c708145..53bbd2a 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -258,14 +258,15 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
return false;
}
PartitionMeta that = (PartitionMeta)o;
- return Objects.equals(cluster, that.cluster) &&
- Objects.equals(getTopicPartition(), that.getTopicPartition());
+ return Objects.equals(partitionId, that.partitionId) &&
+ Objects.equals(cluster, that.cluster) &&
+ Objects.equals(topic, that.topic);
}
@Override
public int hashCode()
{
- return Objects.hash(cluster, getTopicPartition());
+ return Objects.hash(cluster, topic, partitionId);
}
@Override