You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:52 UTC

[07/36] git commit: KAFKA-987 Avoid checkpointing offsets in Kafka consumer that have not changed since the last commit; reviewed by Neha Narkhede

KAFKA-987 Avoid checkpointing offsets in Kafka consumer that have not changed since the last commit; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/401d5919
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/401d5919
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/401d5919

Branch: refs/heads/trunk
Commit: 401d59199cf727f5001dcec3eaa05fa9cdfd079e
Parents: ce7d588
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Jul 23 11:18:50 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Jul 23 11:19:24 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/401d5919/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index e3944d5..a2ea5a9 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -85,6 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+  private var checkpointedOffsets = new Pool[TopicAndPartition, Long]
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(1)
   private val messageStreamCreated = new AtomicBoolean(false)
@@ -249,15 +250,17 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       for (info <- infos.values) {
         val newOffset = info.getConsumeOffset
-        try {
-          updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId,
-            newOffset.toString)
-        } catch {
-          case t: Throwable =>
-          // log it and let it go
-            warn("exception during commitOffsets",  t)
+        if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) {
+          try {
+            updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString)
+            checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset)
+          } catch {
+            case t: Throwable =>
+              // log it and let it go
+              warn("exception during commitOffsets",  t)
+          }
+          debug("Committed offset " + newOffset + " for topic " + info)
         }
-        debug("Committed offset " + newOffset + " for topic " + info)
       }
     }
   }
@@ -607,6 +610,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  config.clientId)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
+      checkpointedOffsets.put(TopicAndPartition(topic, partition), offset)
     }
   }