You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/09/05 21:20:49 UTC
git commit: KAFKA-1510;
Do full (unfiltered) offset commits when offsets storage is set to
Kafka; reviewed by Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/trunk ffb81a581 -> e22f8eded
KAFKA-1510; Do full (unfiltered) offset commits when offsets storage is set to Kafka; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e22f8ede
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e22f8ede
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e22f8ede
Branch: refs/heads/trunk
Commit: e22f8ededaf838e7cec9dd22975d8461764ab076
Parents: ffb81a5
Author: Nicu Marasoiu <nm...@adobe.com>
Authored: Fri Sep 5 12:20:31 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Sep 5 12:20:31 2014 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 26 +++++++++-----------
1 file changed, 11 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e22f8ede/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 21f3e00..fbc680f 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -89,7 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
- private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
+ private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
@@ -277,9 +277,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
- val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
- updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
- zkCommitMeter.mark()
+ if (checkpointedZkOffsets.get(topicPartition) != offset) {
+ val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+ updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+ checkpointedZkOffsets.put(topicPartition, offset)
+ zkCommitMeter.mark()
+ }
}
def commitOffsets(isAutoCommit: Boolean = true) {
@@ -289,10 +292,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
while (!done) {
val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors
val offsetsToCommit = immutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
- partitionTopicInfos.filterNot { case (partition, info) =>
- val newOffset = info.getConsumeOffset()
- newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))
- }.map { case (partition, info) =>
+ partitionTopicInfos.map { case (partition, info) =>
TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
}
}.toSeq:_*)
@@ -301,7 +301,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
if (config.offsetsStorage == "zookeeper") {
offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
- checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset)
}
true
} else {
@@ -316,12 +315,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) =>
- if (errorCode == ErrorMapping.NoError) {
- val offset = offsetsToCommit(topicPartition).offset
- checkpointedOffsets.put(topicPartition, offset)
- if (config.dualCommitEnabled) {
+ if (errorCode == ErrorMapping.NoError && config.dualCommitEnabled) {
+ val offset = offsetsToCommit(topicPartition).offset
commitOffsetToZooKeeper(topicPartition, offset)
- }
}
(folded._1 || // update commitFailed
@@ -808,7 +804,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
config.clientId)
partTopicInfoMap.put(partition, partTopicInfo)
debug(partTopicInfo + " selected new offset " + offset)
- checkpointedOffsets.put(TopicAndPartition(topic, partition), offset)
+ checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
}
}