You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:52 UTC
[07/36] git commit: KAFKA-987 Avoid checkpointing offsets in Kafka
consumer that have not changed since the last commit;
reviewed by Neha Narkhede
KAFKA-987 Avoid checkpointing offsets in Kafka consumer that have not changed since the last commit; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/401d5919
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/401d5919
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/401d5919
Branch: refs/heads/trunk
Commit: 401d59199cf727f5001dcec3eaa05fa9cdfd079e
Parents: ce7d588
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Jul 23 11:18:50 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Jul 23 11:19:24 2013 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/401d5919/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e3944d5..a2ea5a9 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -85,6 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+ private var checkpointedOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1)
private val messageStreamCreated = new AtomicBoolean(false)
@@ -249,15 +250,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
for (info <- infos.values) {
val newOffset = info.getConsumeOffset
- try {
- updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId,
- newOffset.toString)
- } catch {
- case t: Throwable =>
- // log it and let it go
- warn("exception during commitOffsets", t)
+ if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) {
+ try {
+ updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString)
+ checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset)
+ } catch {
+ case t: Throwable =>
+ // log it and let it go
+ warn("exception during commitOffsets", t)
+ }
+ debug("Committed offset " + newOffset + " for topic " + info)
}
- debug("Committed offset " + newOffset + " for topic " + info)
}
}
}
@@ -607,6 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
+ checkpointedOffsets.put(TopicAndPartition(topic, partition), offset)
}
}