You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/09/05 21:20:49 UTC

git commit: KAFKA-1510; Do full (unfiltered) offset commits when offsets storage is set to Kafka; reviewed by Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk ffb81a581 -> e22f8eded


KAFKA-1510; Do full (unfiltered) offset commits when offsets storage is set to Kafka; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e22f8ede
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e22f8ede
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e22f8ede

Branch: refs/heads/trunk
Commit: e22f8ededaf838e7cec9dd22975d8461764ab076
Parents: ffb81a5
Author: Nicu Marasoiu <nm...@adobe.com>
Authored: Fri Sep 5 12:20:31 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Sep 5 12:20:31 2014 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 26 +++++++++-----------
 1 file changed, 11 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e22f8ede/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 21f3e00..fbc680f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -89,7 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-  private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
+  private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
   private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
   private val messageStreamCreated = new AtomicBoolean(false)
@@ -277,9 +277,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   }
 
   def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
-    val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
-    updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
-    zkCommitMeter.mark()
+    if (checkpointedZkOffsets.get(topicPartition) != offset) {
+      val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+      updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+      checkpointedZkOffsets.put(topicPartition, offset)
+      zkCommitMeter.mark()
+    }
   }
 
   def commitOffsets(isAutoCommit: Boolean = true) {
@@ -289,10 +292,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     while (!done) {
       val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors
         val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
-          partitionTopicInfos.filterNot { case (partition, info) =>
-            val newOffset = info.getConsumeOffset()
-            newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))
-          }.map { case (partition, info) =>
+          partitionTopicInfos.map { case (partition, info) =>
             TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
           }
         }.toSeq:_*)
@@ -301,7 +301,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           if (config.offsetsStorage == "zookeeper") {
             offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
               commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
-              checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset)
             }
             true
           } else {
@@ -316,12 +315,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
                 offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) =>
 
-                  if (errorCode == ErrorMapping.NoError) {
-                    val offset = offsetsToCommit(topicPartition).offset
-                    checkpointedOffsets.put(topicPartition, offset)
-                    if (config.dualCommitEnabled) {
+                  if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) {
+                      val offset = offsetsToCommit(topicPartition).offset
                       commitOffsetToZooKeeper(topicPartition, offset)
-                    }
                   }
 
                   (folded._1 || // update commitFailed
@@ -808,7 +804,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                  config.clientId)
       partTopicInfoMap.put(partition, partTopicInfo)
       debug(partTopicInfo + " selected new offset " + offset)
-      checkpointedOffsets.put(TopicAndPartition(topic, partition), offset)
+      checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
     }
   }