You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/05/31 03:51:27 UTC
svn commit: r1344526 [2/3] - in /incubator/kafka/branches/0.8: ./
clients/cpp/src/ contrib/hadoop-consumer/src/main/java/kafka/etl/
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
contrib/hadoop-producer/ contrib/hadoop-producer/src/main/java/kaf...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu May 31 01:51:23 2012
@@ -19,6 +19,7 @@ package kafka.consumer
import java.util.concurrent._
import java.util.concurrent.atomic._
+import locks.ReentrantLock
import scala.collection._
import kafka.cluster._
import kafka.utils._
@@ -33,6 +34,7 @@ import java.lang.IllegalStateException
import kafka.utils.ZkUtils._
import kafka.common.{NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+
/**
* This class handles the consumers interaction with zookeeper
*
@@ -85,16 +87,37 @@ trait ZookeeperConsumerConnectorMBean {
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
- extends ConsumerConnector with ZookeeperConsumerConnectorMBean with Logging {
-
+ extends ConsumerConnector with ZookeeperConsumerConnectorMBean
+ with Logging {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[Fetcher] = None
private var zkClient: ZkClient = null
- private val topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
- // queues : (topic,consumerThreadId) -> queue
- private val queues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
+ private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+ // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
+ private val topicThreadIdAndQueues = new Pool[Tuple2[String,String], BlockingQueue[FetchedDataChunk]]
private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+ private val messageStreamCreated = new AtomicBoolean(false)
+
+ private var sessionExpirationListener: ZKSessionExpireListener = null
+ private var loadBalancerListener: ZKRebalancerListener = null
+
+ private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
+
+ val consumerIdString = {
+ var consumerUuid : String = null
+ config.consumerId match {
+ case Some(consumerId) // for testing only
+ => consumerUuid = consumerId
+ case None // generate unique consumerId automatically
+ => val uuid = UUID.randomUUID()
+ consumerUuid = "%s-%d-%s".format(
+ InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
+ uuid.getMostSignificantBits().toHexString.substring(0,8))
+ }
+ config.groupId + "_" + consumerUuid
+ }
+ this.logIdent = consumerIdString + " "
connectZk()
createFetcher()
@@ -106,10 +129,18 @@ private[kafka] class ZookeeperConsumerCo
def this(config: ConsumerConfig) = this(config, true)
def createMessageStreams[T](topicCountMap: Map[String,Int], decoder: Decoder[T])
- : Map[String,List[KafkaMessageStream[T]]] = {
+ : Map[String,List[KafkaStream[T]]] = {
+ if (messageStreamCreated.getAndSet(true))
+ throw new RuntimeException(this.getClass.getSimpleName +
+ " can create message streams at most once")
consume(topicCountMap, decoder)
}
+ def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) = {
+ val wildcardStreamsHandler = new WildcardStreamsHandler[T](topicFilter, numStreams, decoder)
+ wildcardStreamsHandler.streams
+ }
+
private def createFetcher() {
if (enableFetcher)
fetcher = Some(new Fetcher(config, zkClient))
@@ -124,6 +155,9 @@ private[kafka] class ZookeeperConsumerCo
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
info("ZKConsumerConnector shutting down")
+
+ if (wildcardTopicWatcher != null)
+ wildcardTopicWatcher.shutdown()
try {
scheduler.shutdownNow()
fetcher match {
@@ -146,59 +180,30 @@ private[kafka] class ZookeeperConsumerCo
}
def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
- : Map[String,List[KafkaMessageStream[T]]] = {
+ : Map[String,List[KafkaStream[T]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null")
- val dirs = new ZKGroupDirs(config.groupId)
- var ret = new mutable.HashMap[String,List[KafkaMessageStream[T]]]
-
- var consumerUuid : String = null
- config.consumerId match {
- case Some(consumerId) => // for testing only
- consumerUuid = consumerId
- case None => // generate unique consumerId automatically
- val uuid = UUID.randomUUID()
- consumerUuid = "%s-%d-%s".format( InetAddress.getLocalHost.getHostName,
- System.currentTimeMillis,
- uuid.getMostSignificantBits().toHexString.substring(0,8) )
- }
- val consumerIdString = config.groupId + "_" + consumerUuid
- val topicCount = new TopicCount(consumerIdString, topicCountMap)
-
- // create a queue per topic per consumer thread
- val consumerThreadIdsPerTopic = topicCount.getConsumerThreadIdsPerTopic
- for ((topic, threadIdSet) <- consumerThreadIdsPerTopic) {
- var streamList: List[KafkaMessageStream[T]] = Nil
- for (threadId <- threadIdSet) {
- val stream = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
- queues.put((topic, threadId), stream)
- streamList ::= new KafkaMessageStream[T](topic, stream, config.consumerTimeoutMs, decoder)
- }
- ret += (topic -> streamList)
- debug("adding topic " + topic + " and stream to map..")
- }
+ val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
- // listener to consumer and partition changes
- val loadBalancerListener = new ZKRebalancerListener[T](config.groupId, consumerIdString, ret)
- registerConsumerInZK(dirs, consumerIdString, topicCount)
+ val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
- // register listener for session expired event
- zkClient.subscribeStateChanges(
- new ZKSessionExpireListener[T](dirs, consumerIdString, topicCount, loadBalancerListener))
-
- zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+ // make a list of (queue,stream) pairs, one pair for each threadId
+ val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
+ threadIdSet.map(_ => {
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val stream = new KafkaStream[T](
+ queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ (queue, stream)
+ })
+ ).flatten.toList
- ret.foreach { topicAndStreams =>
- // register on broker partition path changes
- val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
- zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
- }
+ val dirs = new ZKGroupDirs(config.groupId)
+ registerConsumerInZK(dirs, consumerIdString, topicCount)
+ reinitializeConsumer(topicCount, queuesAndStreams)
- // explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance()
- ret
+ loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[T]]]]
}
// this API is used by unit tests only
@@ -206,12 +211,14 @@ private[kafka] class ZookeeperConsumerCo
private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
- createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, topicCount.toJsonString)
+ createEphemeralPathExpectConflict(zkClient,
+ dirs.consumerRegistryDir + "/" + consumerIdString,
+ topicCount.dbString)
info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
- for (queue <- queues.values) {
+ for (queue <- topicThreadIdAndQueues.values) {
debug("Clearing up queue")
queue.clear()
queue.put(ZookeeperConsumerConnector.shutdownCommand)
@@ -330,10 +337,10 @@ private[kafka] class ZookeeperConsumerCo
producedOffset
}
- class ZKSessionExpireListener[T](val dirs: ZKGroupDirs,
+ class ZKSessionExpireListener(val dirs: ZKGroupDirs,
val consumerIdString: String,
val topicCount: TopicCount,
- val loadBalancerListener: ZKRebalancerListener[T])
+ val loadBalancerListener: ZKRebalancerListener)
extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
@@ -355,10 +362,10 @@ private[kafka] class ZookeeperConsumerCo
* consumer in the consumer registry and trigger a rebalance.
*/
info("ZK expired; release old broker parition ownership; re-register consumer " + consumerIdString)
- loadBalancerListener.resetState
+ loadBalancerListener.resetState()
registerConsumerInZK(dirs, consumerIdString, topicCount)
// explicitly trigger load balancing for this consumer
- loadBalancerListener.syncedRebalance
+ loadBalancerListener.syncedRebalance()
// There is no need to resubscribe to child and state changes.
// The child change watchers will be set inside rebalance when we read the children list.
@@ -366,45 +373,69 @@ private[kafka] class ZookeeperConsumerCo
}
- class ZKRebalancerListener[T](val group: String, val consumerIdString: String,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]])
+ class ZKRebalancerListener(val group: String, val consumerIdString: String,
+ val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
extends IZkChildListener {
- private val dirs = new ZKGroupDirs(group)
private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]()
private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
+ private var isWatcherTriggered = false
+ private val lock = new ReentrantLock
+ private val cond = lock.newCondition()
+ private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
+ override def run() {
+ info("starting watcher executor thread for consumer " + consumerIdString)
+ var doRebalance = false
+ while (!isShuttingDown.get) {
+ try {
+ lock.lock()
+ try {
+ if (!isWatcherTriggered)
+ cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
+ } finally {
+ doRebalance = isWatcherTriggered
+ isWatcherTriggered = false
+ lock.unlock()
+ }
+ if (doRebalance)
+ syncedRebalance
+ } catch {
+ case t => error("error during syncedRebalance", t)
+ }
+ }
+ info("stopping watcher executor thread for consumer " + consumerIdString)
+ }
+ }
+ watcherExecutorThread.start()
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
- syncedRebalance
+ lock.lock()
+ try {
+ isWatcherTriggered = true
+ cond.signalAll()
+ } finally {
+ lock.unlock()
+ }
}
- private def releasePartitionOwnership()= {
- info("Releasing partition ownership")
- for ((topic, infos) <- topicRegistry) {
- for(partition <- infos.keys) {
- val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition.toString)
- deletePath(zkClient, partitionOwnerPath)
- debug("Consumer " + consumerIdString + " releasing " + partitionOwnerPath)
- }
- }
+ private def deletePartitionOwnershipFromZK(topic: String, partition: String) {
+ val topicDirs = new ZKGroupTopicDirs(group, topic)
+ val znode = topicDirs.consumerOwnerDir + "/" + partition
+ deletePath(zkClient, znode)
+ debug("Consumer " + consumerIdString + " releasing " + znode)
}
- private def getRelevantTopicMap(myTopicThreadIdsMap: Map[String, Set[String]],
- newPartMap: Map[String, Seq[String]],
- oldPartMap: Map[String, Seq[String]],
- newConsumerMap: Map[String,List[String]],
- oldConsumerMap: Map[String,List[String]]): Map[String, Set[String]] = {
- var relevantTopicThreadIdsMap = new mutable.HashMap[String, Set[String]]()
- for ( (topic, consumerThreadIdSet) <- myTopicThreadIdsMap )
- if ( oldPartMap.get(topic) != newPartMap.get(topic) || oldConsumerMap.get(topic) != newConsumerMap.get(topic))
- relevantTopicThreadIdsMap += (topic -> consumerThreadIdSet)
- relevantTopicThreadIdsMap
+ private def releasePartitionOwnership(localTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]])= {
+ info("Releasing partition ownership")
+ for ((topic, infos) <- localTopicRegistry) {
+ for(partition <- infos.keys)
+ deletePartitionOwnershipFromZK(topic, partition.toString)
+ localTopicRegistry.remove(topic)
+ }
}
def resetState() {
topicRegistry.clear
- oldConsumersPerTopicMap.clear
- oldPartitionsPerTopicMap.clear
}
def syncedRebalance() {
@@ -422,8 +453,6 @@ private[kafka] class ZookeeperConsumerCo
* the value of a child. Just let this go since another rebalance will be triggered.
**/
info("exception during rebalance ", e)
- /* Explicitly make sure another rebalancing attempt will get triggered. */
- done = false
}
info("end rebalancing consumer " + consumerIdString + " try #" + i)
if (done) {
@@ -432,15 +461,9 @@ private[kafka] class ZookeeperConsumerCo
/* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
* clear the cache */
info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
- oldConsumersPerTopicMap.clear()
- oldPartitionsPerTopicMap.clear()
}
- // commit offsets
- commitOffsets()
// stop all fetchers and clear all the queues to avoid data duplication
- closeFetchersForQueues(cluster, kafkaMessageStreams, queues.map(q => q._2))
- // release all partitions, reset state and retry
- releasePartitionOwnership()
+ closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
Thread.sleep(config.rebalanceBackoffMs)
}
}
@@ -449,17 +472,9 @@ private[kafka] class ZookeeperConsumerCo
}
private def rebalance(cluster: Cluster): Boolean = {
- val myTopicThreadIdsMap = getTopicCount(zkClient, group, consumerIdString).getConsumerThreadIdsPerTopic
+ val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val partitionsPerTopicMap = getPartitionsForTopics(zkClient, myTopicThreadIdsMap.keys.iterator)
- val relevantTopicThreadIdsMap = getRelevantTopicMap(myTopicThreadIdsMap, partitionsPerTopicMap, oldPartitionsPerTopicMap, consumersPerTopicMap, oldConsumersPerTopicMap)
- if (relevantTopicThreadIdsMap.size <= 0) {
- info("Consumer %s with %s and topic partitions %s doesn't need to rebalance.".
- format(consumerIdString, consumersPerTopicMap, partitionsPerTopicMap))
- debug("Partitions per topic cache " + oldPartitionsPerTopicMap)
- debug("Consumers per topic cache " + oldConsumersPerTopicMap)
- return true
- }
/**
* fetchers must be stopped to avoid data duplication, since if the current
@@ -467,14 +482,15 @@ private[kafka] class ZookeeperConsumerCo
* But if we don't stop the fetchers first, this consumer would continue returning data for released
* partitions in parallel. So, not stopping the fetchers leads to duplicate data.
*/
- closeFetchers(cluster, kafkaMessageStreams, relevantTopicThreadIdsMap)
+ closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
- releasePartitionOwnership()
+ releasePartitionOwnership(topicRegistry)
var partitionOwnershipDecision = new collection.mutable.HashMap[(String, String), String]()
- for ((topic, consumerThreadIdSet) <- relevantTopicThreadIdsMap) {
- topicRegistry.remove(topic)
- topicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
+ var currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
+
+ for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
+ currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
val topicDirs = new ZKGroupTopicDirs(group, topic)
val curConsumers = consumersPerTopicMap.get(topic).get
@@ -502,11 +518,9 @@ private[kafka] class ZookeeperConsumerCo
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
- val ownPartition = processPartition(topicDirs, partition, topic, consumerThreadId)
- if (!ownPartition)
- return false
- else // record the partition ownership decision
- partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+ addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
+ // record the partition ownership decision
+ partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
}
}
@@ -520,21 +534,21 @@ private[kafka] class ZookeeperConsumerCo
info("Updating the cache")
debug("Partitions per topic cache " + partitionsPerTopicMap)
debug("Consumers per topic cache " + consumersPerTopicMap)
- oldPartitionsPerTopicMap = partitionsPerTopicMap
- oldConsumersPerTopicMap = consumersPerTopicMap
- updateFetcher(cluster, kafkaMessageStreams)
+ topicRegistry = currentTopicRegistry
+ updateFetcher(cluster)
true
- } else
+ } else {
false
+ }
}
private def closeFetchersForQueues(cluster: Cluster,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ messageStreams: Map[String,List[KafkaStream[_]]],
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
case Some(f) => f.stopConnectionsToAllBrokers
- f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, kafkaMessageStreams)
+ f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
info("Committing all offsets after clearing the fetcher queues")
/**
* here, we need to commit offsets before stopping the consumer from returning any more messages
@@ -549,16 +563,15 @@ private[kafka] class ZookeeperConsumerCo
}
}
- private def closeFetchers(cluster: Cluster, kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]],
+ private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
// after this rebalancing attempt
- val queuesTobeCleared = queues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
- closeFetchersForQueues(cluster, kafkaMessageStreams, queuesTobeCleared)
+ val queuesTobeCleared = topicThreadIdAndQueues.filter(q => relevantTopicThreadIdsMap.contains(q._1._1)).map(q => q._2)
+ closeFetchersForQueues(cluster, messageStreams, queuesTobeCleared)
}
- private def updateFetcher[T](cluster: Cluster,
- kafkaMessageStreams: Map[String,List[KafkaMessageStream[T]]]) {
+ private def updateFetcher(cluster: Cluster) {
// update partitions for fetcher
var allPartitionInfos : List[PartitionTopicInfo] = Nil
for (partitionInfos <- topicRegistry.values)
@@ -569,33 +582,13 @@ private[kafka] class ZookeeperConsumerCo
fetcher match {
case Some(f) =>
- f.startConnections(allPartitionInfos, cluster, kafkaMessageStreams)
+ f.startConnections(allPartitionInfos, cluster)
case None =>
}
}
- private def processPartition(topicDirs: ZKGroupTopicDirs, partition: String,
- topic: String, consumerThreadId: String) : Boolean = {
- val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- // check if some other consumer owns this partition at this time
- val currentPartitionOwner = readDataMaybeNull(zkClient, partitionOwnerPath)
- if(currentPartitionOwner != null) {
- if(currentPartitionOwner.equals(consumerThreadId)) {
- info(partitionOwnerPath + " exists with value " + currentPartitionOwner + " during connection loss; this is ok")
- addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
- true
- }
- else {
- info(partitionOwnerPath + " exists with value " + currentPartitionOwner)
- false
- }
- } else {
- addPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId)
- true
- }
- }
-
private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, String), String]): Boolean = {
+ var successfullyOwnedPartitions : List[(String, String)] = Nil
val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
val topic = partitionOwner._1._1
val partition = partitionOwner._1._2
@@ -604,6 +597,7 @@ private[kafka] class ZookeeperConsumerCo
try {
createEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId)
info(consumerThreadId + " successfully owned partition " + partition + " for topic " + topic)
+ successfullyOwnedPartitions ::= (topic, partition)
true
} catch {
case e: ZkNodeExistsException =>
@@ -613,14 +607,20 @@ private[kafka] class ZookeeperConsumerCo
case e2 => throw e2
}
}
- val success = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => if(decision) 0 else 1)
- if(success > 0) false /* even if one of the partition ownership attempt has failed, return false */
+ val hasPartitionOwnershipFailed = partitionOwnershipSuccessful.foldLeft(0)((sum, decision) => sum + (if(decision) 0 else 1))
+ /* even if one of the partition ownership attempt has failed, return false */
+ if(hasPartitionOwnershipFailed > 0) {
+ // remove all paths that we have owned in ZK
+ successfullyOwnedPartitions.foreach(topicAndPartition => deletePartitionOwnershipFromZK(topicAndPartition._1, topicAndPartition._2))
+ false
+ }
else true
}
- private def addPartitionTopicInfo(topicDirs: ZKGroupTopicDirs, partition: String,
+ private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
+ topicDirs: ZKGroupTopicDirs, partition: String,
topic: String, consumerThreadId: String) {
- val partTopicInfoMap = topicRegistry.get(topic)
+ val partTopicInfoMap = currentTopicRegistry.get(topic)
// find the leader for this partition
val leaderOpt = getLeaderForPartition(zkClient, topic, partition.toInt)
@@ -646,8 +646,7 @@ private[kafka] class ZookeeperConsumerCo
}
else
offset = offsetString.toLong
-
- val queue = queues.get((topic, consumerThreadId))
+ val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
@@ -661,5 +660,155 @@ private[kafka] class ZookeeperConsumerCo
debug(partTopicInfo + " selected new offset " + offset)
}
}
+
+ private def reinitializeConsumer[T](
+ topicCount: TopicCount,
+ queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[T])]) {
+
+ val dirs = new ZKGroupDirs(config.groupId)
+
+ // listener to consumer and partition changes
+ if (loadBalancerListener == null) {
+ val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[T]]]
+ loadBalancerListener = new ZKRebalancerListener(
+ config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_]]]])
+ }
+
+ // register listener for session expired event
+ if (sessionExpirationListener == null)
+ sessionExpirationListener = new ZKSessionExpireListener(
+ dirs, consumerIdString, topicCount, loadBalancerListener)
+
+ val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
+
+ // map of {topic -> Set(thread-1, thread-2, ...)}
+ val consumerThreadIdsPerTopic: Map[String, Set[String]] =
+ topicCount.getConsumerThreadIdsPerTopic
+
+ /*
+ * This usage of map flatten breaks up consumerThreadIdsPerTopic into
+ * a set of (topic, thread-id) pairs that we then use to construct
+ * the updated (topic, thread-id) -> queues map
+ */
+ implicit def getTopicThreadIds(v: (String, Set[String])): Set[(String, String)] = v._2.map((v._1, _))
+
+ // iterator over (topic, thread-id) tuples
+ val topicThreadIds: Iterable[(String, String)] =
+ consumerThreadIdsPerTopic.flatten
+
+ // list of (pairs of pairs): e.g., ((topic, thread-id),(queue, stream))
+ val threadQueueStreamPairs = topicCount match {
+ case wildTopicCount: WildcardTopicCount =>
+ for (tt <- topicThreadIds; qs <- queuesAndStreams) yield (tt -> qs)
+ case statTopicCount: StaticTopicCount => {
+ require(topicThreadIds.size == queuesAndStreams.size,
+ "Mismatch between thread ID count (%d) and queue count (%d)".format(
+ topicThreadIds.size, queuesAndStreams.size))
+ topicThreadIds.zip(queuesAndStreams)
+ }
+ }
+
+ threadQueueStreamPairs.foreach(e => {
+ val topicThreadId = e._1
+ val q = e._2._1
+ topicThreadIdAndQueues.put(topicThreadId, q)
+ })
+
+ val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
+ groupedByTopic.foreach(e => {
+ val topic = e._1
+ val streams = e._2.map(_._2._2).toList
+ topicStreamsMap += (topic -> streams)
+ debug("adding topic %s and %d streams to map.".format(topic, streams.size))
+ })
+
+ // listener to consumer and partition changes
+ zkClient.subscribeStateChanges(sessionExpirationListener)
+
+ zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
+
+ topicStreamsMap.foreach { topicAndStreams =>
+ // register on broker partition path changes
+ val partitionPath = BrokerTopicsPath + "/" + topicAndStreams._1
+ zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
+ }
+
+ // explicitly trigger load balancing for this consumer
+ loadBalancerListener.syncedRebalance()
+ }
+
+ class WildcardStreamsHandler[T](topicFilter: TopicFilter,
+ numStreams: Int,
+ decoder: Decoder[T])
+ extends TopicEventHandler[String] {
+
+ if (messageStreamCreated.getAndSet(true))
+ throw new RuntimeException("Each consumer connector can create " +
+ "message streams by filter at most once.")
+
+ private val wildcardQueuesAndStreams = (1 to numStreams)
+ .map(e => {
+ val queue = new LinkedBlockingQueue[FetchedDataChunk](config.maxQueuedChunks)
+ val stream = new KafkaStream[T](
+ queue, config.consumerTimeoutMs, decoder, config.enableShallowIterator)
+ (queue, stream)
+ }).toList
+
+ // bootstrap with existing topics
+ private var wildcardTopics =
+ getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+ .filter(topicFilter.isTopicAllowed)
+
+ private val wildcardTopicCount = TopicCount.constructTopicCount(
+ consumerIdString, topicFilter, numStreams, zkClient)
+
+ val dirs = new ZKGroupDirs(config.groupId)
+ registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
+ reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+
+ if (!topicFilter.requiresTopicEventWatcher) {
+ info("Not creating event watcher for trivial whitelist " + topicFilter)
+ }
+ else {
+ info("Creating topic event watcher for whitelist " + topicFilter)
+ wildcardTopicWatcher = new ZookeeperTopicEventWatcher(config, this)
+
+ /*
+ * Topic events will trigger subsequent synced rebalances. Also, the
+ * consumer will get registered only after an allowed topic becomes
+ * available.
+ */
+ }
+
+ def handleTopicEvent(allTopics: Seq[String]) {
+ debug("Handling topic event")
+
+ val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+
+ val addedTopics = updatedTopics filterNot (wildcardTopics contains)
+ if (addedTopics.nonEmpty)
+ info("Topic event: added topics = %s"
+ .format(addedTopics))
+
+ /*
+ * TODO: Deleted topics are interesting (and will not be a concern until
+ * 0.8 release). We may need to remove these topics from the rebalance
+ * listener's map in reinitializeConsumer.
+ */
+ val deletedTopics = wildcardTopics filterNot (updatedTopics contains)
+ if (deletedTopics.nonEmpty)
+ info("Topic event: deleted topics = %s"
+ .format(deletedTopics))
+
+ wildcardTopics = updatedTopics
+ info("Topics to consume = %s".format(wildcardTopics))
+
+ if (addedTopics.nonEmpty || deletedTopics.nonEmpty)
+ reinitializeConsumer(wildcardTopicCount, wildcardQueuesAndStreams)
+ }
+
+ def streams: Seq[KafkaStream[T]] =
+ wildcardQueuesAndStreams.map(_._2)
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperTopicEventWatcher.scala Thu May 31 01:51:23 2012
@@ -21,11 +21,9 @@ import scala.collection.JavaConversions.
import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
import org.apache.zookeeper.Watcher.Event.KeeperState
-import kafka.server.KafkaServerStartable
-import kafka.common.ConsumerRebalanceFailedException
class ZookeeperTopicEventWatcher(val config:ConsumerConfig,
- val eventHandler: TopicEventHandler[String], kafkaServerStartable: KafkaServerStartable) extends Logging {
+ val eventHandler: TopicEventHandler[String]) extends Logging {
val lock = new Object()
@@ -35,7 +33,7 @@ class ZookeeperTopicEventWatcher(val con
startWatchingTopicEvents()
private def startWatchingTopicEvents() {
- val topicEventListener = new ZkTopicEventListener(kafkaServerStartable)
+ val topicEventListener = new ZkTopicEventListener()
ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.BrokerTopicsPath)
zkClient.subscribeStateChanges(
@@ -52,6 +50,7 @@ class ZookeeperTopicEventWatcher(val con
def shutdown() {
lock.synchronized {
+ info("Shutting down topic event watcher.")
if (zkClient != null) {
stopWatchingTopicEvents()
zkClient.close()
@@ -62,7 +61,7 @@ class ZookeeperTopicEventWatcher(val con
}
}
- class ZkTopicEventListener(val kafkaServerStartable: KafkaServerStartable) extends IZkChildListener {
+ class ZkTopicEventListener extends IZkChildListener {
@throws(classOf[Exception])
def handleChildChange(parent: String, children: java.util.List[String]) {
@@ -76,11 +75,8 @@ class ZookeeperTopicEventWatcher(val con
}
}
catch {
- case e: ConsumerRebalanceFailedException =>
- fatal("can't rebalance in embedded consumer; proceed to shutdown", e)
- kafkaServerStartable.shutdown()
case e =>
- error("error in handling child changes in embedded consumer", e)
+ error("error in handling child changes", e)
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java Thu May 31 01:51:23 2012
@@ -17,34 +17,53 @@
package kafka.javaapi.consumer;
-import kafka.consumer.KafkaMessageStream;
-import kafka.message.Message;
-import kafka.serializer.Decoder;
import java.util.List;
import java.util.Map;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.TopicFilter;
+import kafka.message.Message;
+import kafka.serializer.Decoder;
public interface ConsumerConnector {
- /**
- * Create a list of MessageStreams of type T for each topic.
- *
- * @param topicCountMap a map of (topic, #streams) pair
- * @param decoder a decoder that converts from Message to T
- * @return a map of (topic, list of KafkaMessageStream) pair. The number of items in the
- * list is #streams. Each KafkaMessageStream supports an iterator of messages.
- */
- public <T> Map<String, List<KafkaMessageStream<T>>> createMessageStreams(
- Map<String, Integer> topicCountMap, Decoder<T> decoder);
- public Map<String, List<KafkaMessageStream<Message>>> createMessageStreams(
- Map<String, Integer> topicCountMap);
-
- /**
- * Commit the offsets of all broker partitions connected by this connector.
- */
- public void commitOffsets();
-
- /**
- * Shut down the connector
- */
- public void shutdown();
+ /**
+ * Create a list of MessageStreams of type T for each topic.
+ *
+ * @param topicCountMap a map of (topic, #streams) pair
+ * @param decoder a decoder that converts from Message to T
+ * @return a map of (topic, list of KafkaStream) pairs.
+ * The number of items in the list is #streams. Each stream supports
+ * an iterator over message/metadata pairs.
+ */
+ public <T> Map<String, List<KafkaStream<T>>> createMessageStreams(
+ Map<String, Integer> topicCountMap, Decoder<T> decoder);
+ public Map<String, List<KafkaStream<Message>>> createMessageStreams(
+ Map<String, Integer> topicCountMap);
+
+ /**
+ * Create a list of MessageAndTopicStreams containing messages of type T.
+ *
+ * @param topicFilter a TopicFilter that specifies which topics to
+ * subscribe to (encapsulates a whitelist or a blacklist).
+ * @param numStreams the number of message streams to return.
+ * @param decoder a decoder that converts from Message to T
+ * @return a list of KafkaStream. Each stream supports an
+ * iterator over its MessageAndMetadata elements.
+ */
+ public <T> List<KafkaStream<T>> createMessageStreamsByFilter(
+ TopicFilter topicFilter, int numStreams, Decoder<T> decoder);
+ public List<KafkaStream<Message>> createMessageStreamsByFilter(
+ TopicFilter topicFilter, int numStreams);
+ public List<KafkaStream<Message>> createMessageStreamsByFilter(
+ TopicFilter topicFilter);
+
+ /**
+ * Commit the offsets of all broker partitions connected by this connector.
+ */
+ public void commitOffsets();
+
+ /**
+ * Shut down the connector
+ */
+ public void shutdown();
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala Thu May 31 01:51:23 2012
@@ -16,9 +16,11 @@
*/
package kafka.javaapi.consumer
-import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
import kafka.message.Message
import kafka.serializer.{DefaultDecoder, Decoder}
+import kafka.consumer._
+import scala.collection.JavaConversions.asList
+
/**
* This class handles the consumers interaction with zookeeper
@@ -68,14 +70,14 @@ private[kafka] class ZookeeperConsumerCo
def createMessageStreams[T](
topicCountMap: java.util.Map[String,java.lang.Integer],
decoder: Decoder[T])
- : java.util.Map[String,java.util.List[KafkaMessageStream[T]]] = {
+ : java.util.Map[String,java.util.List[KafkaStream[T]]] = {
import scala.collection.JavaConversions._
val scalaTopicCountMap: Map[String, Int] = Map.empty[String, Int] ++ asMap(topicCountMap.asInstanceOf[java.util.Map[String, Int]])
val scalaReturn = underlying.consume(scalaTopicCountMap, decoder)
- val ret = new java.util.HashMap[String,java.util.List[KafkaMessageStream[T]]]
+ val ret = new java.util.HashMap[String,java.util.List[KafkaStream[T]]]
for ((topic, streams) <- scalaReturn) {
- var javaStreamList = new java.util.ArrayList[KafkaMessageStream[T]]
+ var javaStreamList = new java.util.ArrayList[KafkaStream[T]]
for (stream <- streams)
javaStreamList.add(stream)
ret.put(topic, javaStreamList)
@@ -85,9 +87,17 @@ private[kafka] class ZookeeperConsumerCo
def createMessageStreams(
topicCountMap: java.util.Map[String,java.lang.Integer])
- : java.util.Map[String,java.util.List[KafkaMessageStream[Message]]] =
+ : java.util.Map[String,java.util.List[KafkaStream[Message]]] =
createMessageStreams(topicCountMap, new DefaultDecoder)
+ def createMessageStreamsByFilter[T](topicFilter: TopicFilter, numStreams: Int, decoder: Decoder[T]) =
+ asList(underlying.createMessageStreamsByFilter(topicFilter, numStreams, decoder))
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter, numStreams: Int) =
+ createMessageStreamsByFilter(topicFilter, numStreams, new DefaultDecoder)
+
+ def createMessageStreamsByFilter(topicFilter: TopicFilter) =
+ createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder)
def commitOffsets() {
underlying.commitOffsets
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/MessageSet.scala Thu May 31 01:51:23 2012
@@ -17,8 +17,10 @@
package kafka.javaapi.message
+
import kafka.message.{MessageAndOffset, InvalidMessageException}
+
/**
* A set of messages. A message set has a fixed serialized form, though the container
* for the bytes could be either in-memory or on disk. A The format of each message is
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu May 31 01:51:23 2012
@@ -25,6 +25,7 @@ import kafka.utils._
import kafka.common._
import kafka.api.OffsetRequest
import java.util._
+import kafka.server.BrokerTopicStat
private[kafka] object Log {
val FileSuffix = ".kafka"
@@ -199,7 +200,7 @@ private[kafka] class Log(val dir: File,
* Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
* Returns the offset at which the messages are written.
*/
- def append(messages: MessageSet): Unit = {
+ def append(messages: ByteBufferMessageSet): Unit = {
// validate the messages
var numberOfMessages = 0
for(messageAndOffset <- messages) {
@@ -208,13 +209,25 @@ private[kafka] class Log(val dir: File,
numberOfMessages += 1;
}
+ BrokerTopicStat.getBrokerTopicStat(getTopicName).recordMessagesIn(numberOfMessages)
+ BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
logStats.recordAppendedMessages(numberOfMessages)
-
+
+ // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
+ val validByteBuffer = messages.getBuffer.duplicate()
+ val messageSetValidBytes = messages.validBytes
+ if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
+ throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes +
+ " Message set cannot be appended to log. Possible causes are corrupted produce requests")
+
+ validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+ val validMessages = new ByteBufferMessageSet(validByteBuffer)
+
// they are valid, insert them in the log
lock synchronized {
try {
val segment = segments.view.last
- segment.messageSet.append(messages)
+ segment.messageSet.append(validMessages)
maybeFlush(numberOfMessages)
maybeRoll(segment)
}
@@ -247,10 +260,18 @@ private[kafka] class Log(val dir: File,
val deletable = view.takeWhile(predicate)
for(seg <- deletable)
seg.deleted = true
- val numToDelete = deletable.size
+ var numToDelete = deletable.size
// if we are deleting everything, create a new empty segment
- if(numToDelete == view.size)
- roll()
+ if(numToDelete == view.size) {
+ if (view(numToDelete - 1).size > 0)
+ roll()
+ else {
+ // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
+ // file name. So simply reuse the last segment and reset the modified time.
+ view(numToDelete - 1).file.setLastModified(SystemTime.milliseconds)
+ numToDelete -=1
+ }
+ }
segments.trunc(numToDelete)
}
}
@@ -288,9 +309,12 @@ private[kafka] class Log(val dir: File,
*/
def roll() {
lock synchronized {
- val last = segments.view.last
val newOffset = nextAppendOffset
val newFile = new File(dir, Log.nameFromOffset(newOffset))
+ if (newFile.exists) {
+ warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
+ newFile.delete()
+ }
debug("Rolling log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala Thu May 31 01:51:23 2012
@@ -18,10 +18,10 @@
package kafka.message
import kafka.utils.Logging
-import kafka.common.{InvalidMessageSizeException, ErrorMapping}
import java.nio.ByteBuffer
import java.nio.channels._
import kafka.utils.IteratorTemplate
+import kafka.common.{MessageSizeTooLargeException, InvalidMessageSizeException, ErrorMapping}
/**
* A sequence of messages stored in a byte buffer
@@ -36,8 +36,9 @@ import kafka.utils.IteratorTemplate
class ByteBufferMessageSet(private val buffer: ByteBuffer,
private val initialOffset: Long = 0L,
private val errorCode: Int = ErrorMapping.NoError) extends MessageSet with Logging {
- private var validByteCount = -1L
private var shallowValidByteCount = -1L
+ if(sizeInBytes > Int.MaxValue)
+ throw new InvalidMessageSizeException("Message set cannot be larger than " + Int.MaxValue)
def this(compressionCodec: CompressionCodec, messages: Message*) {
this(MessageSet.createByteBuffer(compressionCodec, messages:_*), 0L, ErrorMapping.NoError)
@@ -59,7 +60,7 @@ class ByteBufferMessageSet(private val b
private def shallowValidBytes: Long = {
if(shallowValidByteCount < 0) {
- val iter = deepIterator
+ val iter = this.internalIterator(true)
while(iter.hasNext) {
val messageAndOffset = iter.next
shallowValidByteCount = messageAndOffset.offset
@@ -70,12 +71,31 @@ class ByteBufferMessageSet(private val b
}
/** Write the messages in this set to the given channel */
- def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long =
- channel.write(buffer.duplicate)
-
- override def iterator: Iterator[MessageAndOffset] = deepIterator
+ def writeTo(channel: GatheringByteChannel, offset: Long, size: Long): Long = {
+ buffer.mark()
+ val written = channel.write(buffer)
+ buffer.reset()
+ written
+ }
+
+ /** default iterator that iterates over decompressed messages */
+ override def iterator: Iterator[MessageAndOffset] = internalIterator()
+
+ /** iterator over compressed messages without decompressing */
+ def shallowIterator: Iterator[MessageAndOffset] = internalIterator(true)
+
+ def verifyMessageSize(maxMessageSize: Int){
+ var shallowIter = internalIterator(true)
+ while(shallowIter.hasNext){
+ var messageAndOffset = shallowIter.next
+ val payloadSize = messageAndOffset.message.payloadSize
+ if ( payloadSize > maxMessageSize)
+ throw new MessageSizeTooLargeException("payload size of " + payloadSize + " larger than " + maxMessageSize)
+ }
+ }
- private def deepIterator(): Iterator[MessageAndOffset] = {
+ /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. This is used in verifyMessageSize() function **/
+ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageAndOffset] {
var topIter = buffer.slice()
@@ -106,33 +126,50 @@ class ByteBufferMessageSet(private val b
message.limit(size)
topIter.position(topIter.position + size)
val newMessage = new Message(message)
- newMessage.compressionCodec match {
- case NoCompressionCodec =>
- debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
- innerIter = null
- currValidBytes += 4 + size
- trace("currValidBytes = " + currValidBytes)
- new MessageAndOffset(newMessage, currValidBytes)
- case _ =>
- debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
- innerIter = CompressionUtils.decompress(newMessage).deepIterator
- if (!innerIter.hasNext) {
- currValidBytes += 4 + lastMessageSize
+ if(!newMessage.isValid)
+ throw new InvalidMessageException("message is invalid, compression codec: " + newMessage.compressionCodec
+ + " size: " + size + " curr offset: " + currValidBytes + " init offset: " + initialOffset)
+
+ if(isShallow){
+ currValidBytes += 4 + size
+ trace("shallow iterator currValidBytes = " + currValidBytes)
+ new MessageAndOffset(newMessage, currValidBytes)
+ }
+ else{
+ newMessage.compressionCodec match {
+ case NoCompressionCodec =>
+ debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
- }
- makeNext()
+ currValidBytes += 4 + size
+ trace("currValidBytes = " + currValidBytes)
+ new MessageAndOffset(newMessage, currValidBytes)
+ case _ =>
+ debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = CompressionUtils.decompress(newMessage).internalIterator()
+ if (!innerIter.hasNext) {
+ currValidBytes += 4 + lastMessageSize
+ innerIter = null
+ }
+ makeNext()
+ }
}
}
override def makeNext(): MessageAndOffset = {
- val isInnerDone = innerDone()
- isInnerDone match {
- case true => makeNextOuter
- case false => {
- val messageAndOffset = innerIter.next
- if (!innerIter.hasNext)
- currValidBytes += 4 + lastMessageSize
- new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ if(isShallow){
+ makeNextOuter
+ }
+ else{
+ val isInnerDone = innerDone()
+ debug("makeNext() in internalIterator: innerDone = " + isInnerDone)
+ isInnerDone match {
+ case true => makeNextOuter
+ case false => {
+ val messageAndOffset = innerIter.next
+ if (!innerIter.hasNext)
+ currValidBytes += 4 + lastMessageSize
+ new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ }
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala Thu May 31 01:51:23 2012
@@ -49,7 +49,7 @@ class GZIPCompression(inputStream: Input
}
override def read(a: Array[Byte]): Int = {
- gzipIn.read(a)
+ gzipIn.read(a)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/InvalidMessageException.scala Thu May 31 01:51:23 2012
@@ -20,4 +20,6 @@ package kafka.message
/**
* Indicates that a message failed its checksum and is corrupt
*/
-class InvalidMessageException extends RuntimeException
+class InvalidMessageException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala Thu May 31 01:51:23 2012
@@ -13,11 +13,10 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.message
-/**
- * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
- */
-case class MessageAndOffset(val message: Message, val offset: Long)
+
+case class MessageAndOffset(message: Message, offset: Long)
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala Thu May 31 01:51:23 2012
@@ -36,7 +36,7 @@ object ConsoleProducer {
.withRequiredArg
.describedAs("connection_string")
.ofType(classOf[String])
- val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+ val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
.withRequiredArg
@@ -78,7 +78,7 @@ object ConsoleProducer {
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
- val async = options.has(asyncOpt)
+ val sync = options.has(syncOpt)
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
@@ -89,10 +89,10 @@ object ConsoleProducer {
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
- props.put("producer.type", if(async) "async" else "sync")
+ props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
- props.put("batch.size", batchSize)
- props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+ props.put("batch.size", batchSize.toString)
+ props.put("queue.time", sendTimeout.toString)
props.put("serializer.class", encoderClass)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala Thu May 31 01:51:23 2012
@@ -22,9 +22,7 @@ import org.apache.log4j.spi.LoggingEvent
import org.apache.log4j.AppenderSkeleton
import org.apache.log4j.helpers.LogLog
import kafka.utils.Logging
-import kafka.serializer.Encoder
import java.util.{Properties, Date}
-import kafka.message.Message
import scala.collection._
class KafkaLog4jAppender extends AppenderSkeleton with Logging {
@@ -94,7 +92,3 @@ class KafkaLog4jAppender extends Appende
override def requiresLayout: Boolean = false
}
-
-class DefaultStringEncoder extends Encoder[LoggingEvent] {
- override def toMessage(event: LoggingEvent):Message = new Message(event.getMessage.asInstanceOf[String].getBytes)
-}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ProducerConfig.scala Thu May 31 01:51:23 2012
@@ -32,10 +32,24 @@ class ProducerConfig(val props: Properti
if(brokerList != null)
throw new InvalidConfigException("broker.list is deprecated. Use zk.connect instead")
+ /**
+ * If DefaultEventHandler is used, this specifies the number of times to
+ * retry if an error is encountered during send. Currently, it is only
+ * appropriate when broker.list points to a VIP. If the zk.connect option
+ * is used instead, this will not have any effect because with the zk-based
+ * producer, brokers are not re-selected upon retry. So retries would go to
+ * the same (potentially still down) broker. (KAFKA-253 will help address
+ * this.)
+ */
+ val numRetries = Utils.getInt(props, "num.retries", 0)
+
/** If both broker.list and zk.connect options are specified, throw an exception */
if(zkConnect == null)
throw new InvalidConfigException("zk.connect property is required")
+ if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
+ throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+
/** the partitioner class for partitioning events amongst sub-topics */
val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu May 31 01:51:23 2012
@@ -18,10 +18,16 @@
package kafka.producer
import kafka.api._
-import kafka.common.MessageSizeTooLargeException
import kafka.message.MessageSet
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Request, Receive}
import kafka.utils._
+import java.util.Random
+import kafka.common.MessageSizeTooLargeException
+
+object SyncProducer {
+ val RequestKey: Short = 0
+ val randomGenerator = new Random
+}
/*
* Send a message set.
@@ -31,14 +37,22 @@ class SyncProducer(val config: SyncProdu
private val MaxConnectBackoffMs = 60000
private var sentOnConnection = 0
+ /** make time-based reconnect starting at a random time **/
+ private var lastConnectionTime = System.currentTimeMillis - SyncProducer.randomGenerator.nextDouble() * config.reconnectInterval
+
private val lock = new Object()
@volatile private var shutdown: Boolean = false
private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
- debug("Instantiating Scala Sync Producer")
+ trace("Instantiating Scala Sync Producer")
private def verifyRequest(request: Request) = {
- if (logger.isTraceEnabled) {
+ /**
+ * This seems a little convoluted, but the idea is to turn on verification simply changing log4j settings
+ * Also, when verification is turned on, care should be taken to see that the logs don't fill up with unnecessary
+ * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level
+ */
+ if (logger.isDebugEnabled) {
val buffer = new BoundedByteBufferSend(request).buffer
trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort()
@@ -71,9 +85,11 @@ class SyncProducer(val config: SyncProdu
}
// TODO: do we still need this?
sentOnConnection += 1
- if(sentOnConnection >= config.reconnectInterval) {
+
+ if(sentOnConnection >= config.reconnectInterval || (config.reconnectTimeInterval >= 0 && System.currentTimeMillis - lastConnectionTime >= config.reconnectTimeInterval)) {
reconnect()
sentOnConnection = 0
+ lastConnectionTime = System.currentTimeMillis
}
SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
response
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala Thu May 31 01:51:23 2012
@@ -40,6 +40,9 @@ trait SyncProducerConfigShared {
val reconnectInterval = Utils.getInt(props, "reconnect.interval", 30000)
+ /** negative reconnect time interval means disabling this time-based reconnect feature */
+ var reconnectTimeInterval = Utils.getInt(props, "reconnect.time.interval.ms", 1000*1000*10)
+
val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
/* the client application sending the producer requests */
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu May 31 01:51:23 2012
@@ -70,10 +70,11 @@ class ProducerSendThread[K,V](val thread
trace("Dequeued item for topic %s, partition key: %s, data: %s"
.format(currentQueueItem.getTopic, currentQueueItem.getKey.toString, currentQueueItem.getData.toString))
events += currentQueueItem
-
- // check if the batch size is reached
- full = events.size >= batchSize
}
+
+ // check if the batch size is reached
+ full = events.size >= batchSize
+
if(full || expired) {
if(expired) debug(elapsed + " ms elapsed. Queue time reached. Sending..")
if(full) debug("Batch full. Sending..")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu May 31 01:51:23 2012
@@ -96,7 +96,7 @@ class KafkaApis(val requestChannel: Requ
// TODO: need to handle ack's here! Will probably move to another method.
kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
- log.append(partitionData.messages)
+ log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
offsets(msgIndex) = log.nextAppendOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu May 31 01:51:23 2012
@@ -71,7 +71,7 @@ class KafkaConfig(props: Properties) ext
val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24 * 7, (1, Int.MaxValue))
/* the maximum size of the log before deleting it */
- val logRetentionSize = Utils.getInt(props, "log.retention.size", -1)
+ val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)
/* the number of hours to keep a log file before deleting it for some specific topic*/
val logRetentionHoursMap = Utils.getTopicRentionHours(Utils.getString(props, "topic.log.retention.hours", ""))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Thu May 31 01:51:23 2012
@@ -17,9 +17,9 @@
package kafka.server
-import org.apache.log4j._
import kafka.network._
import kafka.utils._
+import java.util.concurrent.atomic.AtomicLong
/**
* A thread that answers kafka requests.
@@ -60,3 +60,60 @@ class KafkaRequestHandlerPool(val reques
}
}
+
+trait BrokerTopicStatMBean {
+ def getMessagesIn: Long
+ def getBytesIn: Long
+ def getBytesOut: Long
+ def getFailedProduceRequest: Long
+ def getFailedFetchRequest: Long
+}
+
+@threadsafe
+class BrokerTopicStat extends BrokerTopicStatMBean {
+ private val numCumulatedMessagesIn = new AtomicLong(0)
+ private val numCumulatedBytesIn = new AtomicLong(0)
+ private val numCumulatedBytesOut = new AtomicLong(0)
+ private val numCumulatedFailedProduceRequests = new AtomicLong(0)
+ private val numCumulatedFailedFetchRequests = new AtomicLong(0)
+
+ def getMessagesIn: Long = numCumulatedMessagesIn.get
+
+ def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
+
+ def getBytesIn: Long = numCumulatedBytesIn.get
+
+ def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
+
+ def getBytesOut: Long = numCumulatedBytesOut.get
+
+ def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
+
+ def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
+
+ def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
+
+ def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
+
+ def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
+}
+
+object BrokerTopicStat extends Logging {
+ private val stats = new Pool[String, BrokerTopicStat]
+ private val allTopicStat = new BrokerTopicStat
+ Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
+
+ def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
+
+ def getBrokerTopicStat(topic: String): BrokerTopicStat = {
+ var stat = stats.get(topic)
+ if (stat == null) {
+ stat = new BrokerTopicStat
+ if (stats.putIfNotExists(topic, stat) == null)
+ Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
+ else
+ stat = stats.get(topic)
+ }
+ return stat
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServerStartable.scala Thu May 31 01:51:23 2012
@@ -17,36 +17,21 @@
package kafka.server
-import kafka.utils.{Utils, Logging}
-import kafka.consumer._
-import kafka.producer.{ProducerData, ProducerConfig, Producer}
-import kafka.message.Message
-import java.util.concurrent.CountDownLatch
-
-import scala.collection.Map
-
-class KafkaServerStartable(val serverConfig: KafkaConfig,
- val consumerConfig: ConsumerConfig,
- val producerConfig: ProducerConfig) extends Logging {
+import kafka.utils.Logging
+
+
+class KafkaServerStartable(val serverConfig: KafkaConfig) extends Logging {
private var server : KafkaServer = null
- private var embeddedConsumer : EmbeddedConsumer = null
init
- def this(serverConfig: KafkaConfig) = this(serverConfig, null, null)
-
private def init() {
server = new KafkaServer(serverConfig)
- if (consumerConfig != null)
- embeddedConsumer =
- new EmbeddedConsumer(consumerConfig, producerConfig, this)
}
def startup() {
try {
server.startup()
- if (embeddedConsumer != null)
- embeddedConsumer.startup()
}
catch {
case e =>
@@ -57,8 +42,6 @@ class KafkaServerStartable(val serverCon
def shutdown() {
try {
- if (embeddedConsumer != null)
- embeddedConsumer.shutdown()
server.shutdown()
}
catch {
@@ -73,153 +56,4 @@ class KafkaServerStartable(val serverCon
}
-class EmbeddedConsumer(private val consumerConfig: ConsumerConfig,
- private val producerConfig: ProducerConfig,
- private val kafkaServerStartable: KafkaServerStartable) extends TopicEventHandler[String] with Logging {
-
- private val whiteListTopics =
- consumerConfig.mirrorTopicsWhitelist.split(",").toList.map(_.trim)
-
- private val blackListTopics =
- consumerConfig.mirrorTopicsBlackList.split(",").toList.map(_.trim)
-
- // mirrorTopics should be accessed by handleTopicEvent only
- private var mirrorTopics:Seq[String] = List()
-
- private var consumerConnector: ConsumerConnector = null
- private var topicEventWatcher:ZookeeperTopicEventWatcher = null
-
- private val producer = new Producer[Null, Message](producerConfig)
-
- var threadList = List[MirroringThread]()
-
- private def isTopicAllowed(topic: String) = {
- if (consumerConfig.mirrorTopicsWhitelist.nonEmpty)
- whiteListTopics.contains(topic)
- else
- !blackListTopics.contains(topic)
- }
-
- // TopicEventHandler call-back only
- @Override
- def handleTopicEvent(allTopics: Seq[String]) {
- val newMirrorTopics = allTopics.filter(isTopicAllowed)
-
- val addedTopics = newMirrorTopics filterNot (mirrorTopics contains)
- if (addedTopics.nonEmpty)
- info("topic event: added topics = %s".format(addedTopics))
-
- val deletedTopics = mirrorTopics filterNot (newMirrorTopics contains)
- if (deletedTopics.nonEmpty)
- info("topic event: deleted topics = %s".format(deletedTopics))
-
- mirrorTopics = newMirrorTopics
-
- if (addedTopics.nonEmpty || deletedTopics.nonEmpty) {
- info("mirror topics = %s".format(mirrorTopics))
- startNewConsumerThreads(makeTopicMap(mirrorTopics))
- }
- }
-
- private def makeTopicMap(mirrorTopics: Seq[String]) = {
- if (mirrorTopics.nonEmpty)
- Utils.getConsumerTopicMap(mirrorTopics.mkString(
- "", ":%d,".format(consumerConfig.mirrorConsumerNumThreads),
- ":%d".format(consumerConfig.mirrorConsumerNumThreads)))
- else
- Utils.getConsumerTopicMap("")
- }
-
- private def startNewConsumerThreads(topicMap: Map[String, Int]) {
- if (topicMap.nonEmpty) {
- if (consumerConnector != null)
- consumerConnector.shutdown()
-
- /**
- * Before starting new consumer threads for the updated set of topics,
- * shutdown the existing mirroring threads. Since the consumer connector
- * is already shutdown, the mirroring threads should finish their task almost
- * instantaneously. If they don't, this points to an error that needs to be looked
- * into, and further mirroring should stop
- */
- threadList.foreach(_.shutdown)
-
- // KAFKA: 212: clear the thread list to remove the older thread references that are already shutdown
- threadList = Nil
-
- consumerConnector = Consumer.create(consumerConfig)
- val topicMessageStreams = consumerConnector.createMessageStreams(topicMap)
- for ((topic, streamList) <- topicMessageStreams)
- for (i <- 0 until streamList.length)
- threadList ::= new MirroringThread(streamList(i), topic, i)
-
- threadList.foreach(_.start)
- }
- else
- info("Not starting mirroring threads (mirror topic list is empty)")
- }
-
- def startup() {
- info("staring up embedded consumer")
- topicEventWatcher = new ZookeeperTopicEventWatcher(consumerConfig, this, kafkaServerStartable)
- /*
- * consumer threads are (re-)started upon topic events (which includes an
- * initial startup event which lists the current topics)
- */
- }
-
- def shutdown() {
- // first shutdown the topic watcher to prevent creating new consumer streams
- if (topicEventWatcher != null)
- topicEventWatcher.shutdown()
- info("Stopped the ZK watcher for new topics, now stopping the Kafka consumers")
- // stop pulling more data for mirroring
- if (consumerConnector != null)
- consumerConnector.shutdown()
- info("Stopped the kafka consumer threads for existing topics, now stopping the existing mirroring threads")
- // wait for all mirroring threads to stop
- threadList.foreach(_.shutdown)
- info("Stopped all existing mirroring threads, now stopping the producer")
- // only then, shutdown the producer
- producer.close()
- info("Successfully shutdown this Kafka mirror")
- }
-
- class MirroringThread(val stream: KafkaMessageStream[Message], val topic: String, val threadId: Int) extends Thread with Logging {
- val shutdownComplete = new CountDownLatch(1)
- val name = "kafka-embedded-consumer-%s-%d".format(topic, threadId)
- this.setDaemon(false)
- this.setName(name)
-
-
- override def run = {
- info("Starting mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
-
- try {
- for (message <- stream) {
- trace("Mirroring thread received message " + message.checksum)
- val pd = new ProducerData[Null, Message](topic, message)
- producer.send(pd)
- }
- }
- catch {
- case e =>
- fatal(topic + " stream " + threadId + " unexpectedly exited", e)
- }finally {
- shutdownComplete.countDown
- info("Stopped mirroring thread %s for topic %s and stream %d".format(name, topic, threadId))
- }
- }
-
- def shutdown = {
- try {
- shutdownComplete.await
- }catch {
- case e: InterruptedException => fatal("Shutdown of thread " + name + " interrupted. " +
- "Mirroring thread might leak data!")
- }
- }
- }
-}
-
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerShell.scala Thu May 31 01:51:23 2012
@@ -82,15 +82,15 @@ object ConsumerShell {
}
}
-class ZKConsumerThread(stream: KafkaMessageStream[String]) extends Thread with Logging {
+class ZKConsumerThread(stream: KafkaStream[String]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
override def run() {
println("Starting consumer thread..")
var count: Int = 0
try {
- for (message <- stream) {
- println("consumed: " + message)
+ for (messageAndMetadata <- stream) {
+ println("consumed: " + messageAndMetadata.message)
count += 1
}
}catch {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ReplayLogProducer.scala Thu May 31 01:51:23 2012
@@ -32,8 +32,6 @@ object ReplayLogProducer extends Logging
private val GROUPID: String = "replay-log-producer"
def main(args: Array[String]) {
- var isNoPrint = false;
-
val config = new Config(args)
val executor = Executors.newFixedThreadPool(config.numThreads)
@@ -151,7 +149,7 @@ object ReplayLogProducer extends Logging
}
}
- class ZKConsumerThread(config: Config, stream: KafkaMessageStream[Message]) extends Thread with Logging {
+ class ZKConsumerThread(config: Config, stream: KafkaStream[Message]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val props = new Properties()
val brokerInfoList = config.brokerInfo.split("=")
@@ -180,9 +178,9 @@ object ReplayLogProducer extends Logging
stream.slice(0, config.numMessages)
else
stream
- for (message <- iter) {
+ for (messageAndMetadata <- iter) {
try {
- producer.send(new ProducerData[Message, Message](config.outputTopic, message))
+ producer.send(new ProducerData[Message, Message](config.outputTopic, messageAndMetadata.message))
if (config.delayedMSBtwSend > 0 && (messageCount + 1) % config.batchSize == 0)
Thread.sleep(config.delayedMSBtwSend)
messageCount += 1
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Thu May 31 01:51:23 2012
@@ -57,9 +57,9 @@ object VerifyConsumerRebalance extends L
// check if the rebalancing operation succeeded.
try {
if(validateRebalancingOperation(zkClient, group))
- info("Rebalance operation successful !")
+ println("Rebalance operation successful !")
else
- error("Rebalance operation failed !")
+ println("Rebalance operation failed !")
} catch {
case e2: Throwable => error("Error while verifying current rebalancing operation", e2)
}
@@ -132,6 +132,4 @@ object VerifyConsumerRebalance extends L
rebalanceSucceeded
}
-
-
-}
\ No newline at end of file
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1344526&r1=1344525&r2=1344526&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Thu May 31 01:51:23 2012
@@ -23,17 +23,21 @@ trait Logging {
val loggerName = this.getClass.getName
lazy val logger = Logger.getLogger(loggerName)
+ protected var logIdent = ""
+
+ private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+
def trace(msg: => String): Unit = {
if (logger.isTraceEnabled())
- logger.trace(msg)
+ logger.trace(msgWithLogIdent(msg))
}
def trace(e: => Throwable): Any = {
if (logger.isTraceEnabled())
- logger.trace("",e)
+ logger.trace(logIdent,e)
}
def trace(msg: => String, e: => Throwable) = {
if (logger.isTraceEnabled())
- logger.trace(msg,e)
+ logger.trace(msgWithLogIdent(msg),e)
}
def swallowTrace(action: => Unit) {
Utils.swallow(logger.trace, action)
@@ -41,15 +45,15 @@ trait Logging {
def debug(msg: => String): Unit = {
if (logger.isDebugEnabled())
- logger.debug(msg)
+ logger.debug(msgWithLogIdent(msg))
}
def debug(e: => Throwable): Any = {
if (logger.isDebugEnabled())
- logger.debug("",e)
+ logger.debug(logIdent,e)
}
def debug(msg: => String, e: => Throwable) = {
if (logger.isDebugEnabled())
- logger.debug(msg,e)
+ logger.debug(msgWithLogIdent(msg),e)
}
def swallowDebug(action: => Unit) {
Utils.swallow(logger.debug, action)
@@ -57,55 +61,54 @@ trait Logging {
def info(msg: => String): Unit = {
if (logger.isInfoEnabled())
- logger.info(msg)
+ logger.info(msgWithLogIdent(msg))
}
def info(e: => Throwable): Any = {
if (logger.isInfoEnabled())
- logger.info("",e)
+ logger.info(logIdent,e)
}
def info(msg: => String,e: => Throwable) = {
if (logger.isInfoEnabled())
- logger.info(msg,e)
+ logger.info(msgWithLogIdent(msg),e)
}
def swallowInfo(action: => Unit) {
Utils.swallow(logger.info, action)
}
def warn(msg: => String): Unit = {
- logger.warn(msg)
+ logger.warn(msgWithLogIdent(msg))
}
def warn(e: => Throwable): Any = {
- logger.warn("",e)
+ logger.warn(logIdent,e)
}
def warn(msg: => String, e: => Throwable) = {
- logger.warn(msg,e)
+ logger.warn(msgWithLogIdent(msg),e)
}
def swallowWarn(action: => Unit) {
Utils.swallow(logger.warn, action)
}
def swallow(action: => Unit) = swallowWarn(action)
- def error(msg: => String):Unit = {
- logger.error(msg)
+ def error(msg: => String): Unit = {
+ logger.error(msgWithLogIdent(msg))
}
def error(e: => Throwable): Any = {
- logger.error("",e)
+ logger.error(logIdent,e)
}
def error(msg: => String, e: => Throwable) = {
- logger.error(msg,e)
+ logger.error(msgWithLogIdent(msg),e)
}
def swallowError(action: => Unit) {
Utils.swallow(logger.error, action)
}
def fatal(msg: => String): Unit = {
- logger.fatal(msg)
+ logger.fatal(msgWithLogIdent(msg))
}
def fatal(e: => Throwable): Any = {
- logger.fatal("",e)
+ logger.fatal(logIdent,e)
}
def fatal(msg: => String, e: => Throwable) = {
- logger.fatal(msg,e)
+ logger.fatal(msgWithLogIdent(msg),e)
}
-
}
\ No newline at end of file