You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/02/02 20:07:50 UTC
svn commit: r1239766 - in /incubator/kafka/trunk:
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/
core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaa...
Author: nehanarkhede
Date: Thu Feb 2 19:07:48 2012
New Revision: 1239766
URL: http://svn.apache.org/viewvc?rev=1239766&view=rev
Log:
KAFKA 256 Bug in the consumer rebalancing logic leads to the consumer not pulling data from some partitions; patched by nehanarkhede; reviewed by joelkoshy
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/trunk/core/src/test/resources/log4j.properties
incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/api/MultiFetchRequest.scala Thu Feb 2 19:07:48 2012
@@ -19,8 +19,6 @@ package kafka.api
import java.nio._
import kafka.network._
-import kafka.utils._
-import kafka.api._
object MultiFetchRequest {
def readFrom(buffer: ByteBuffer): MultiFetchRequest = {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Broker.scala Thu Feb 2 19:07:48 2012
@@ -17,11 +17,7 @@
package kafka.cluster
-import java.util.Arrays
import kafka.utils._
-import java.net.InetAddress
-import kafka.server.KafkaConfig
-import util.parsing.json.JSON
/**
* A Kafka broker
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/cluster/Cluster.scala Thu Feb 2 19:07:48 2012
@@ -17,7 +17,6 @@
package kafka.cluster
-import kafka.utils._
import scala.collection._
/**
@@ -33,7 +32,7 @@ private[kafka] class Cluster {
brokers.put(broker.id, broker)
}
- def getBroker(id: Int) = brokers.get(id).get
+ def getBroker(id: Int): Option[Broker] = brokers.get(id)
def add(broker: Broker) = brokers.put(broker.id, broker)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/common/ErrorMapping.scala Thu Feb 2 19:07:48 2012
@@ -17,7 +17,6 @@
package kafka.common
-import kafka.consumer._
import kafka.message.InvalidMessageException
import java.nio.ByteBuffer
import java.lang.Throwable
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala Thu Feb 2 19:07:48 2012
@@ -21,13 +21,11 @@ import scala.collection.mutable._
import scala.collection.JavaConversions._
import org.I0Itec.zkclient._
import joptsimple._
-import java.util.Arrays.asList
import java.util.Properties
import java.util.Random
import java.io.PrintStream
import kafka.message._
import kafka.utils.{Utils, Logging}
-import kafka.utils.ZkUtils
import kafka.utils.ZKStringSerializer
/**
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/Fetcher.scala Thu Feb 2 19:07:48 2012
@@ -22,6 +22,7 @@ import kafka.cluster._
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.BlockingQueue
import kafka.utils._
+import java.lang.IllegalStateException
/**
* The fetcher is a background thread that fetches data from a set of servers
@@ -73,7 +74,13 @@ private [consumer] class Fetcher(val con
// open a new fetcher thread for each broker
val ids = Set() ++ topicInfos.map(_.brokerId)
- val brokers = ids.map(cluster.getBroker(_))
+ val brokers = ids.map { id =>
+ cluster.getBroker(id) match {
+ case Some(broker) => broker
+ case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started")
+ }
+ }
+
fetcherThreads = new Array[FetcherRunnable](brokers.size)
var i = 0
for(broker <- brokers) {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/TopicCount.scala Thu Feb 2 19:07:48 2012
@@ -21,7 +21,7 @@ import scala.collection._
import scala.util.parsing.json.JSON
import kafka.utils.Logging
-private[consumer] object TopicCount extends Logging {
+private[kafka] object TopicCount extends Logging {
val myConversionFunc = {input : String => input.toInt}
JSON.globalNumberParser = myConversionFunc
@@ -44,7 +44,7 @@ private[consumer] object TopicCount exte
}
-private[consumer] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
+private[kafka] class TopicCount(val consumerIdString: String, val topicCountMap: Map[String, Int]) {
def getConsumerThreadIdsPerTopic()
: Map[String, Set[String]] = {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Feb 2 19:07:48 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -30,6 +30,8 @@ import kafka.api.OffsetRequest
import java.util.UUID
import kafka.serializer.Decoder
import kafka.common.{ConsumerRebalanceFailedException, InvalidConfigException}
+import java.lang.IllegalStateException
+import kafka.utils.ZkUtils._
/**
* This class handles the consumers interaction with zookeeper
@@ -157,7 +159,7 @@ private[kafka] class ZookeeperConsumerCo
var consumerUuid : String = null
config.consumerId match {
- case Some(consumerId) // for testing only
+ case Some(consumerId) // for testing only
=> consumerUuid = consumerId
case None // generate unique consumerId automatically
=> val uuid = UUID.randomUUID()
@@ -193,7 +195,7 @@ private[kafka] class ZookeeperConsumerCo
ret.foreach { topicAndStreams =>
// register on broker partition path changes
- val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topicAndStreams._1
+ val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
}
@@ -204,7 +206,7 @@ private[kafka] class ZookeeperConsumerCo
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
- ZkUtils.createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+ createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
info("end registering consumer " + consumerIdString + " in ZK")
}
@@ -239,7 +241,7 @@ private[kafka] class ZookeeperConsumerCo
for (info <- infos.values) {
val newOffset = info.getConsumeOffset
try {
- ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
+ updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partition.name,
newOffset.toString)
}
catch {
@@ -289,7 +291,7 @@ private[kafka] class ZookeeperConsumerCo
try {
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
- val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+ val offsetString = readDataMaybeNull(zkClient, znode)
if (offsetString != null)
return offsetString.toLong
else
@@ -309,8 +311,12 @@ private[kafka] class ZookeeperConsumerCo
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
- val cluster = ZkUtils.getCluster(zkClient)
- val broker = cluster.getBroker(brokerId)
+ val cluster = getCluster(zkClient)
+ val broker = cluster.getBroker(brokerId) match {
+ case Some(b) => b
+ case None => throw new IllegalStateException("Broker " + brokerId + " is unavailable. Cannot issue " +
+ "getOffsetsBefore request")
+ }
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize)
val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
@@ -358,7 +364,7 @@ private[kafka] class ZookeeperConsumerCo
loadBalancerListener.syncedRebalance
// There is no need to resubscribe to child and state changes.
- // The child change watchers will be set inside rebalance when we read the children list.
+ // The child change watchers will be set inside rebalance when we read the children list.
}
}
@@ -376,34 +382,17 @@ private[kafka] class ZookeeperConsumerCo
}
private def releasePartitionOwnership()= {
+ info("Releasing partition ownership")
for ((topic, infos) <- topicRegistry) {
val topicDirs = new ZKGroupTopicDirs(group, topic)
for(partition <- infos.keys) {
val znode = topicDirs.consumerOwnerDir + "/" + partition
- ZkUtils.deletePath(zkClient, znode)
+ deletePath(zkClient, znode)
debug("Consumer " + consumerIdString + " releasing " + znode)
}
}
}
- private def getConsumersPerTopic(group: String) : mutable.Map[String, List[String]] = {
- val consumers = ZkUtils.getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
- val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
- for (consumer <- consumers) {
- val topicCount = getTopicCount(consumer)
- for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
- for (consumerThreadId <- consumerThreadIdSet)
- consumersPerTopicMap.get(topic) match {
- case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
- case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
- }
- }
- }
- for ( (topic, consumerList) <- consumersPerTopicMap )
- consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
- consumersPerTopicMap
- }
-
private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
newPartMap: Map[String,List[String]],
oldPartMap: Map[String,List[String]],
@@ -416,11 +405,6 @@ private[kafka] class ZookeeperConsumerCo
relevantTopicThreadIdsMap
}
- private def getTopicCount(consumerId: String) : TopicCount = {
- val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
- TopicCount.constructTopicCount(consumerId, topicCountJson)
- }
-
def resetState() {
topicRegistry.clear
oldConsumersPerTopicMap.clear
@@ -432,19 +416,34 @@ private[kafka] class ZookeeperConsumerCo
for (i <- 0 until config.maxRebalanceRetries) {
info("begin rebalancing consumer " + consumerIdString + " try #" + i)
var done = false
+ val cluster = getCluster(zkClient)
try {
- done = rebalance()
+ done = rebalance(cluster)
}
catch {
case e =>
- // occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
- // For example, a ZK node can disappear between the time we get all children and the time we try to get
- // the value of a child. Just let this go since another rebalance will be triggered.
+ /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
+ * For example, a ZK node can disappear between the time we get all children and the time we try to get
+ * the value of a child. Just let this go since another rebalance will be triggered.
+ **/
info("exception during rebalance ", e)
+ /* Explicitly make sure another rebalancing attempt will get triggered. */
+ done = false
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
- if (done)
+ if (done) {
return
+ }else {
+ /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
+ * clear the cache */
+ info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
+ oldConsumersPerTopicMap.clear()
+ oldPartitionsPerTopicMap.clear()
+ }
+ // commit offsets
+ commitOffsets()
+ // stop all fetchers and clear all the queues to avoid data duplication
+ closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
// release all partitions, reset state and retry
releasePartitionOwnership()
Thread.sleep(config.rebalanceBackoffMs)
@@ -454,26 +453,30 @@ private[kafka] class ZookeeperConsumerCo
throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.maxRebalanceRetries +" retries")
}
- private def rebalance(): Boolean = {
- val myTopicThreadIdsMap = getTopicCount(consumerIdString).getConsumerThreadIdsPerTopic
- val cluster = ZkUtils.getCluster(zkClient)
- val consumersPerTopicMap = getConsumersPerTopic(group)
- val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
+ private def rebalance(cluster: Cluster): Boolean = {
+ val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+ val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
+ val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
if (relevantTopicThreadIdsMap.size <= 0) {
- info("Consumer " + consumerIdString + " with " + consumersPerTopicMap + " doesn't need to rebalance.")
+ info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
+ format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
+ debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
+ debug("Consumers per topic cache " + oldConsumersPerTopicMap)
return true
}
- // fetchers must be stopped to avoid data duplication, since if the current
- // rebalancing attempt fails, the partitions that are released could be owned by another consumer.
- // But if we don't stop the fetchers first, this consumer would continue returning data for released
- // partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+ /**
+ * fetchers must be stopped to avoid data duplication, since if the current
+ * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
+ * But if we don't stop the fetchers first, this consumer would continue returning data for released
+ * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
+ */
closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
- info("Releasing partition ownership")
releasePartitionOwnership()
+ var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
topicRegistry.remove(topic)
topicRegistry.put(topic, new Pool[Partition, PartitionTopicInfo])
@@ -505,41 +508,60 @@ private[kafka] class ZookeeperConsumerCo
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
- if (ownPartition)
- info(consumerThreadId + " successfully owned partition " + partition)
- else
+ if (!ownPartition)
return false
+ else // record the partition ownership decision
+ partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
}
}
- updateFetcher(cluster, kafkaMessageStreams)
- oldPartitionsPerTopicMap = partitionsPerTopicMap
- oldConsumersPerTopicMap = consumersPerTopicMap
- true
+
+ /**
+ * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+ * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+ */
+ if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+ info("Updating the cache")
+ debug("Partitions per topic cache " + partitionsPerTopicMap)
+ debug("Consumers per topic cache " + consumersPerTopicMap)
+ oldPartitionsPerTopicMap = partitionsPerTopicMap
+ oldConsumersPerTopicMap = consumersPerTopicMap
+ updateFetcher(cluster, kafkaMessageStreams)
+ true
+ }else
+ false
}
- private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
- relevantTopicThreadIdsMap: Map[String, Set[String]]) {
- // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
- // after this rebalancing attempt
- val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+ private def closeFetchersForQueues(cluster: Cluster,
+ kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) => f.stopConnectionsToAllBrokers
- f.clearFetcherQueues(allPartitionInfos, cluster, queuesTobeCleared, kafkaMessageStreams)
+ f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
info("Committing all offsets after clearing the fetcher queues")
- // here, we need to commit offsets before stopping the consumer from returning any more messages
- // from the current data chunk. Since partition ownership is not yet released, this commit offsets
- // call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
- // for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
- // by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
- // successfully and the fetchers restart to fetch more data chunks
+ /**
+ * here, we need to commit offsets before stopping the consumer from returning any more messages
+ * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+ * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+ * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+ * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+ * successfully and the fetchers restart to fetch more data chunks
+ **/
commitOffsets
case None =>
}
}
+ private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ relevantTopicThreadIdsMap: Map[String, Set[String]]) {
+ // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
+ // after this rebalancing attempt
+ val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+ closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+ }
+
private def updateFetcher[T](cluster: Cluster,
kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
// update partitions for fetcher
@@ -560,18 +582,47 @@ private[kafka] class ZookeeperConsumerCo
private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
topic: String, consumerThreadId: String) : Boolean = {
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- try {
- ZkUtils.createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+ // check if some other consumer owns this partition at this time
+ val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
+ if(currentPartitionOwner != null) {
+ if(currentPartitionOwner.equals(consumerThreadId)) {
+ info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
+ addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+ true
+ }
+ else {
+ info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
+ false
+ }
+ } else {
+ addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
+ true
}
- catch {
- case e: ZkNodeExistsException =>
- // The node hasn't been deleted by the original owner. So wait a bit and retry.
- info("waiting for the partition ownership to be deleted: " + partition)
- return false
- case e2 => throw e2
+ }
+
+ private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+ val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
+ val topic = partitionOwner._1._1
+ val partition = partitionOwner._1._2
+ val consumerThreadId = partitionOwner._2
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+ try {
+ createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
+ info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+ true
+ }
+ catch {
+ case e: ZkNodeExistsException =>
+ // The node hasn't been deleted by the original owner. So wait a bit and retry.
+ info("waiting for the partition ownership to be deleted: " + partition)
+ false
+ case e2 => throw e2
+ }
}
- addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
- true
+ val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
+ if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */
+ else true
}
private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partitionString: String,
@@ -580,7 +631,7 @@ private[kafka] class ZookeeperConsumerCo
val partTopicInfoMap = topicRegistry.get(topic)
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
- val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
+ val offsetString = readDataMaybeNull(zkClient, znode)
// If first time starting a consumer, set the initial offset based on the config
var offset : Long = 0L
if (offsetString == null)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Thu Feb 2 19:07:48 2012
@@ -18,7 +18,7 @@
package kafka.consumer
import scala.collection.JavaConversions._
-import kafka.utils.{Utils, ZkUtils, ZKStringSerializer, Logging}
+import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
import kafka.server.KafkaServerStartable
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/Implicits.scala Thu Feb 2 19:07:48 2012
@@ -16,10 +16,8 @@
*/
package kafka.javaapi
-import java.nio.ByteBuffer
import kafka.serializer.Encoder
-import kafka.producer.{ProducerConfig, ProducerPool}
-import kafka.producer.async.{AsyncProducerConfig, QueueItem}
+import kafka.producer.async.QueueItem
import kafka.utils.Logging
private[javaapi] object Implicits extends Logging {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Thu Feb 2 19:07:48 2012
@@ -17,8 +17,7 @@
package kafka.javaapi.message
-import java.nio.channels._
-import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
+import kafka.message.{MessageAndOffset, InvalidMessageException}
/**
* A set of messages. A message set has a fixed serialized form, though the container
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/javaapi/producer/Producer.scala Thu Feb 2 19:07:48 2012
@@ -54,7 +54,6 @@ class Producer[K,V](config: ProducerConf
* partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
* object in the send API
*/
- import kafka.javaapi.Implicits._
def this(config: ProducerConfig,
encoder: Encoder[V],
eventHandler: kafka.javaapi.producer.async.EventHandler[V],
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala Thu Feb 2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.message
import java.io.InputStream
import java.nio.ByteBuffer
-import scala.Math
class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
override def read():Int = {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu Feb 2 19:07:48 2012
@@ -17,7 +17,6 @@
package kafka.message
-import scala.collection.mutable
import kafka.utils.Logging
import kafka.common.{InvalidMessageSizeException, ErrorMapping}
import java.nio.ByteBuffer
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/CompressionUtils.scala Thu Feb 2 19:07:48 2012
@@ -54,8 +54,8 @@ class GZIPCompression(inputStream: Input
}
class SnappyCompression(inputStream: InputStream,outputStream: ByteArrayOutputStream) extends CompressionFacade(inputStream,outputStream) {
- import org.xerial.snappy.{SnappyInputStream}
- import org.xerial.snappy.{SnappyOutputStream}
+ import org.xerial.snappy.SnappyInputStream
+ import org.xerial.snappy.SnappyOutputStream
val snappyIn:SnappyInputStream = if (inputStream == null) null else new SnappyInputStream(inputStream)
val snappyOut:SnappyOutputStream = if (outputStream == null) null else new SnappyOutputStream(outputStream)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/FileMessageSet.scala Thu Feb 2 19:07:48 2012
@@ -22,8 +22,6 @@ import java.nio._
import java.nio.channels._
import java.util.concurrent.atomic._
-import kafka._
-import kafka.message._
import kafka.utils._
/**
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/message/Message.scala Thu Feb 2 19:07:48 2012
@@ -18,9 +18,6 @@
package kafka.message
import java.nio._
-import java.nio.channels._
-import java.util.zip.CRC32
-import java.util.UUID
import kafka.utils._
import kafka.common.UnknownMagicByteException
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServer.scala Thu Feb 2 19:07:48 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import java.net._
import java.io._
-import java.nio._
import java.nio.channels._
import kafka.utils._
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/network/SocketServerStats.scala Thu Feb 2 19:07:48 2012
@@ -17,8 +17,6 @@
package kafka.network
-import java.util.concurrent.atomic._
-import javax.management._
import kafka.utils._
import kafka.api.RequestKeys
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu Feb 2 19:07:48 2012
@@ -18,14 +18,10 @@
package kafka.producer
import scala.collection.JavaConversions._
-import org.I0Itec.zkclient._
import joptsimple._
-import java.util.Arrays.asList
import java.util.Properties
-import java.util.Random
import java.io._
import kafka.message._
-import kafka.utils._
import kafka.serializer._
object ConsoleProducer {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu Feb 2 19:07:48 2012
@@ -19,7 +19,7 @@ package kafka.producer
import async.MissingConfigException
import org.apache.log4j.spi.{LoggingEvent, ErrorCode}
-import org.apache.log4j.{Logger, AppenderSkeleton}
+import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.{Utils, Logging}
import kafka.serializer.Encoder
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Thu Feb 2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.producer
import kafka.utils.Utils
import java.util.Properties
-import kafka.message.{CompressionUtils, CompressionCodec}
class SyncProducerConfig(val props: Properties) extends SyncProducerConfigShared {
/** the broker to which the producer sends events */
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Thu Feb 2 19:07:48 2012
@@ -17,7 +17,6 @@
package kafka.server
-import scala.reflect.BeanProperty
import kafka.log.LogManager
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/MultiMessageSetSend.scala Thu Feb 2 19:07:48 2012
@@ -17,10 +17,7 @@
package kafka.server
-import java.nio._
-import java.nio.channels._
import kafka.network._
-import kafka.message._
import kafka.utils._
/**
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/JmxTool.scala Thu Feb 2 19:07:48 2012
@@ -22,7 +22,7 @@ import java.util.Date
import java.text.SimpleDateFormat
import javax.management._
import javax.management.remote._
-import joptsimple.{OptionSet, OptionParser}
+import joptsimple.OptionParser
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.math._
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ProducerShell.scala Thu Feb 2 19:07:48 2012
@@ -17,12 +17,9 @@
package kafka.tools
-import java.net.URI
import java.io._
import joptsimple._
-import kafka.message._
import kafka.producer._
-import java.util.Properties
import kafka.utils.Utils
/**
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu Feb 2 19:07:48 2012
@@ -17,19 +17,17 @@
package kafka.tools
-import java.io.File
import joptsimple.OptionParser
-import org.apache.log4j.Logger
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.producer.async.DefaultEventHandler
-import kafka.serializer.{DefaultEncoder, StringEncoder}
+import kafka.serializer.DefaultEncoder
import kafka.producer.{ProducerData, DefaultPartitioner, ProducerConfig, Producer}
import kafka.consumer._
-import kafka.utils.{ZKStringSerializer, Utils, Logging}
+import kafka.utils.{ZKStringSerializer, Logging}
import kafka.api.OffsetRequest
import org.I0Itec.zkclient._
-import kafka.message.{CompressionCodec, Message, MessageSet, FileMessageSet}
+import kafka.message.{CompressionCodec, Message}
object ReplayLogProducer extends Logging {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala Thu Feb 2 19:07:48 2012
@@ -22,7 +22,6 @@ import joptsimple._
import kafka.api.FetchRequest
import kafka.utils._
import kafka.consumer._
-import kafka.server._
/**
* Command line program to dump out messages to standard out using the simple consumer
Added: incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1239766&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu Feb 2 19:07:48 2012
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import joptsimple.OptionParser
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils, ZKStringSerializer}
+
+object VerifyConsumerRebalance extends Logging {
+ def main(args: Array[String]) {
+ val parser = new OptionParser()
+
+ val zkConnectOpt = parser.accepts("zk.connect", "ZooKeeper connect string.").
+ withRequiredArg().defaultsTo("localhost:2181").ofType(classOf[String]);
+ val groupOpt = parser.accepts("group", "Consumer group.").
+ withRequiredArg().ofType(classOf[String])
+ parser.accepts("help", "Print this message.")
+
+ val options = parser.parse(args : _*)
+
+ if (options.has("help")) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
+
+ for (opt <- List(groupOpt))
+ if (!options.has(opt)) {
+ System.err.println("Missing required argument: %s".format(opt))
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val group = options.valueOf(groupOpt)
+
+ var zkClient: ZkClient = null
+ try {
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+
+ debug("zkConnect = %s; group = %s".format(zkConnect, group))
+
+ // check if the rebalancing operation succeeded.
+ try {
+ if(validateRebalancingOperation(zkClient, group))
+ info("Rebalance operation successful !")
+ else
+ error("Rebalance operation failed !")
+ } catch {
+ case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
+ }
+ }
+ finally {
+ if (zkClient != null)
+ zkClient.close()
+ }
+ }
+
+ private def validateRebalancingOperation(zkClient: ZkClient, group: String): Boolean = {
+ info("Verifying rebalancing operation for consumer group " + group)
+ var rebalanceSucceeded: Boolean = true
+ /**
+ * A successful rebalancing operation would select an owner for each available partition
+ * This means that for each partition registered under /brokers/topics/[topic]/[broker-id], an owner exists
+ * under /consumers/[consumer_group]/owners/[topic]/[broker_id-partition_id]
+ */
+ val consumersPerTopicMap = ZkUtils.getConsumersPerTopic(zkClient, group)
+ val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, consumersPerTopicMap.keys.iterator)
+
+ partitionsPerTopicMap.foreach { partitionsForTopic =>
+ val topic = partitionsForTopic._1
+ val partitions = partitionsForTopic._2
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ info("Alive partitions for topic %s are %s ".format(topic, partitions.toString))
+ info("Alive consumers for topic %s => %s ".format(topic, consumersPerTopicMap.get(topic)))
+ val partitionsWithOwners = ZkUtils.getChildrenParentMayNotExist(zkClient, topicDirs.consumerOwnerDir)
+ if(partitionsWithOwners.size == 0) {
+ error("No owners for any partitions for topic " + topic)
+ rebalanceSucceeded = false
+ }
+ debug("Children of " + topicDirs.consumerOwnerDir + " = " + partitionsWithOwners.toString)
+ val consumerIdsForTopic = consumersPerTopicMap.get(topic)
+
+ // for each available partition for topic, check if an owner exists
+ partitions.foreach { partition =>
+ // check if there is a node for [partition]
+ if(!partitionsWithOwners.exists(p => p.equals(partition))) {
+ error("No owner for topic %s partition %s".format(topic, partition))
+ rebalanceSucceeded = false
+ }
+ // try reading the partition owner path for see if a valid consumer id exists there
+ val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
+ val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
+ if(partitionOwner == null) {
+ error("No owner for topic %s partition %s".format(topic, partition))
+ rebalanceSucceeded = false
+ }
+ else {
+ // check if the owner is a valid consumer id
+ consumerIdsForTopic match {
+ case Some(consumerIds) =>
+ if(!consumerIds.contains(partitionOwner)) {
+ error("Owner %s for topic %s partition %s is not a valid member of consumer " +
+ "group %s".format(partitionOwner, topic, partition, group))
+ rebalanceSucceeded = false
+ }
+ else
+ info("Owner of topic %s partition %s is %s".format(topic, partition, partitionOwner))
+ case None => {
+ error("No consumer ids registered for topic " + topic)
+ rebalanceSucceeded = false
+ }
+ }
+ }
+ }
+
+ }
+
+ rebalanceSucceeded
+ }
+
+
+}
\ No newline at end of file
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Thu Feb 2 19:07:48 2012
@@ -19,7 +19,6 @@ package kafka.utils
import java.util.concurrent._
import java.util.concurrent.atomic._
-import kafka.utils._
/**
* A scheduler for running jobs in the background
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Range.scala Thu Feb 2 19:07:48 2012
@@ -17,9 +17,8 @@
package kafka.utils
-import scala.math._
-/**
+/**
* A generic range value with a start and end
*/
trait Range {
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Thu Feb 2 19:07:48 2012
@@ -21,6 +21,7 @@ import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{SimpleConsumer, ConsumerConfig}
import kafka.cluster.Partition
import kafka.api.OffsetRequest
+import java.lang.IllegalStateException
/**
* A utility that updates the offset of every broker partition to the offset of latest log segment file, in ZK.
@@ -55,7 +56,11 @@ object UpdateOffsetsInZK {
var numParts = 0
for (partString <- partitions) {
val part = Partition.parse(partString)
- val broker = cluster.getBroker(part.brokerId)
+ val broker = cluster.getBroker(part.brokerId) match {
+ case Some(b) => b
+ case None => throw new IllegalStateException("Broker " + part.brokerId + " is unavailable. Cannot issue " +
+ "getOffsetsBefore request")
+ }
val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100 * 1024)
val offsets = consumer.getOffsetsBefore(topic, part.partId, offsetOption, 1)
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Thu Feb 2 19:07:48 2012
@@ -23,7 +23,6 @@ import java.nio.channels._
import java.util.concurrent.atomic._
import java.lang.management._
import java.util.zip.CRC32
-import org.apache.log4j.Logger
import javax.management._
import java.util.Properties
import scala.collection._
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Feb 2 19:07:48 2012
@@ -23,6 +23,7 @@ import kafka.cluster.{Broker, Cluster}
import scala.collection._
import java.util.Properties
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import kafka.consumer.TopicCount
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@@ -236,6 +237,44 @@ object ZkUtils extends Logging {
val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
zkClient.delete(brokerPartTopicPath)
}
+
+ def getConsumersInGroup(zkClient: ZkClient, group: String): Seq[String] = {
+ val dirs = new ZKGroupDirs(group)
+ getChildren(zkClient, dirs.consumerRegistryDir)
+ }
+
+ def getTopicCount(zkClient: ZkClient, group: String, consumerId: String) : TopicCount = {
+ val dirs = new ZKGroupDirs(group)
+ val topicCountJson = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)
+ TopicCount.constructTopicCount(consumerId, topicCountJson)
+ }
+
+ def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
+ val dirs = new ZKGroupDirs(group)
+ val consumersInGroup = getConsumersInGroup(zkClient, group)
+ val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+ ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)))
+ consumersInGroup.zip(topicCountMaps).toMap
+ }
+
+ def getConsumersPerTopic(zkClient: ZkClient, group: String) : mutable.Map[String, List[String]] = {
+ val dirs = new ZKGroupDirs(group)
+ val consumers = getChildrenParentMayNotExist(zkClient, dirs.consumerRegistryDir)
+ val consumersPerTopicMap = new mutable.HashMap[String, List[String]]
+ for (consumer <- consumers) {
+ val topicCount = getTopicCount(zkClient, group, consumer)
+ for ((topic, consumerThreadIdSet) <- topicCount.getConsumerThreadIdsPerTopic()) {
+ for (consumerThreadId <- consumerThreadIdSet)
+ consumersPerTopicMap.get(topic) match {
+ case Some(curConsumers) => consumersPerTopicMap.put(topic, consumerThreadId :: curConsumers)
+ case _ => consumersPerTopicMap.put(topic, List(consumerThreadId))
+ }
+ }
+ }
+ for ( (topic, consumerList) <- consumersPerTopicMap )
+ consumersPerTopicMap.put(topic, consumerList.sortWith((s,t) => s < t))
+ consumersPerTopicMap
+ }
}
object ZKStringSerializer extends ZkSerializer {
Modified: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (original)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Thu Feb 2 19:07:48 2012
@@ -21,4 +21,5 @@ log4j.appender.stdout.layout.ConversionP
log4j.logger.kafka=WARN
# zkclient can be verbose, during debugging it is common to adjust is separately
-log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
\ No newline at end of file
+log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
+log4j.logger.org.apache.zookeeper=WARN
Modified: incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh (original)
+++ incubator/kafka/trunk/system_test/broker_failure/bin/run-test.sh Thu Feb 2 19:07:48 2012
@@ -68,7 +68,7 @@ readonly test_start_time="$(date +%s)"
readonly num_msg_per_batch=500
readonly batches_per_iteration=5
-readonly num_iterations=10
+readonly num_iterations=12
readonly zk_source_port=2181
readonly zk_mirror_port=2182
@@ -132,6 +132,8 @@ producer_performance_crc_log=$base_dir/p
producer_performance_crc_sorted_log=$base_dir/producer_performance_crc_sorted.log
producer_performance_crc_sorted_uniq_log=$base_dir/producer_performance_crc_sorted_uniq.log
+consumer_rebalancing_log=$base_dir/consumer_rebalancing_verification.log
+
consumer_prop_file=$base_dir/config/whitelisttest.consumer.properties
checksum_diff_log=$base_dir/checksum_diff.log
@@ -173,6 +175,17 @@ get_random_range() {
return $(($(($RANDOM % range)) + $lo))
}
+verify_consumer_rebalancing() {
+
+ info "Verifying consumer rebalancing operation"
+
+ $base_dir/bin/kafka-run-class.sh \
+ kafka.tools.VerifyConsumerRebalance \
+ --zk.connect=localhost:2181 \
+ --group $consumer_grp \
+ 2>&1 >> $consumer_rebalancing_log
+}
+
wait_for_zero_consumer_lags() {
# no of times to check for zero lagging
@@ -618,6 +631,7 @@ start_test() {
sleep $wait_time_after_restarting_broker
fi
fi
+ verify_consumer_rebalancing
else
info "No bouncing performed"
fi
@@ -662,6 +676,8 @@ start_console_consumer_for_mirror_produc
wait_for_zero_source_console_consumer_lags
wait_for_zero_mirror_console_consumer_lags
+verify_consumer_rebalancing
+
shutdown_servers
cmp_checksum
Modified: incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties?rev=1239766&r1=1239765&r2=1239766&view=diff
==============================================================================
--- incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties (original)
+++ incubator/kafka/trunk/system_test/broker_failure/config/log4j.properties Thu Feb 2 19:07:48 2012
@@ -26,13 +26,15 @@ log4j.appender.stdout.layout.ConversionP
# Turn on all our debugging info
#log4j.logger.kafka=INFO
-#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO
+log4j.logger.org.apache.zookeeper=INFO
log4j.logger.kafka.consumer=DEBUG
log4j.logger.kafka.server.EmbeddedConsumer$MirroringThread=TRACE
log4j.logger.kafka.server.KafkaRequestHandlers=TRACE
#log4j.logger.kafka.producer.async.AsyncProducer=TRACE
#log4j.logger.kafka.producer.async.ProducerSendThread=TRACE
log4j.logger.kafka.producer.async.DefaultEventHandler=TRACE
+log4j.logger.kafka.tools.VerifyConsumerRebalance=DEBUG
# to print message checksum from ProducerPerformance
log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG