You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/11 18:16:27 UTC
svn commit: r1360261 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/consumer/ main/scala/kafka/server/
test/scala/unit/kafka/integration/
Author: junrao
Date: Wed Jul 11 16:16:26 2012
New Revision: 1360261
URL: http://svn.apache.org/viewvc?rev=1360261&view=rev
Log:
ZookeeperConsumerConnector needs to connect to new leader after leadership change; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-362
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Jul 11 16:16:26 2012
@@ -83,6 +83,9 @@ class ConsumerConfig(props: Properties)
/** backoff time between retries during rebalance */
val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
+ /** backoff time to refresh the leader of a partition after it loses the current leader */
+ val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200)
+
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1360261&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.server.{AbstractFetcherThread, AbstractFetcherManager}
+import kafka.cluster.{Cluster, Broker}
+import scala.collection.immutable
+import scala.collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.ZkUtils._
+import kafka.utils.SystemTime
+import java.util.concurrent.CountDownLatch
+
+/**
+ * Usage:
+ * Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly
+ * until shutdown() is called.
+ */
+class ConsumerFetcherManager(private val consumerIdString: String,
+ private val config: ConsumerConfig,
+ private val zkClient : ZkClient)
+ extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
+ private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
+ private var cluster: Cluster = null
+ private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)]
+ private val lock = new ReentrantLock
+ private val cond = lock.newCondition()
+ private val isShuttingDown = new AtomicBoolean(false)
+ private val leaderFinderThreadShutdownLatch = new CountDownLatch(1)
+ private val leaderFinderThread = new Thread(consumerIdString + "_leader_finder_thread") {
+ // thread responsible for adding the fetcher to the right broker when leader is available
+ override def run() {
+ info("starting %s".format(getName))
+ while (!isShuttingDown.get) {
+ try {
+ lock.lock()
+ try {
+ if (noLeaderPartitionSet.isEmpty)
+ cond.await()
+ for ((topic, partitionId) <- noLeaderPartitionSet) {
+ // find the leader for this partition
+ getLeaderForPartition(zkClient, topic, partitionId) match {
+ case Some(leaderId) =>
+ cluster.getBroker(leaderId) match {
+ case Some(broker) =>
+ val pti = partitionMap((topic, partitionId))
+ addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
+ noLeaderPartitionSet.remove((topic, partitionId))
+ case None =>
+ error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
+ .format(leaderId, topic, partitionId))
+ }
+ case None => // let it go since we will keep retrying
+ }
+ }
+ } finally {
+ lock.unlock()
+ }
+ Thread.sleep(config.refreshLeaderBackoffMs)
+ } catch {
+ case t =>
+ if (!isShuttingDown.get())
+ error("error in %s".format(getName), t)
+ }
+ }
+ leaderFinderThreadShutdownLatch.countDown()
+ info("stopping %s".format(getName))
+ }
+ }
+ leaderFinderThread.start()
+
+
+ override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+ new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+ config, sourceBroker, this)
+ }
+
+ def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
+ if (isShuttingDown.get)
+ throw new RuntimeException("%s already shutdown".format(name))
+ lock.lock()
+ try {
+ partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
+ this.cluster = cluster
+ noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId))
+ cond.signalAll()
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def stopAllConnections() {
+ lock.lock()
+ try {
+ partitionMap = null
+ noLeaderPartitionSet.clear()
+ } finally {
+ lock.unlock()
+ }
+ closeAllFetchers()
+ }
+
+ def getPartitionTopicInfo(key: (String, Int)) : PartitionTopicInfo = {
+ var pti :PartitionTopicInfo =null
+ lock.lock()
+ try {
+ pti = partitionMap(key)
+ } finally {
+ lock.unlock()
+ }
+ pti
+ }
+
+ def addPartitionsWithError(partitionList: Iterable[(String, Int)]) {
+ debug("adding partitions with error %s".format(partitionList))
+ lock.lock()
+ try {
+ if (partitionMap != null) {
+ noLeaderPartitionSet ++= partitionList
+ cond.signalAll()
+ }
+ } finally {
+ lock.unlock()
+ }
+ }
+
+ def shutdown() {
+ info("shutting down")
+ isShuttingDown.set(true)
+ leaderFinderThread.interrupt()
+ leaderFinderThreadShutdownLatch.await()
+ stopAllConnections()
+ info("shutdown completed")
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1360261&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.cluster.Broker
+import kafka.server.AbstractFetcherThread
+import kafka.message.ByteBufferMessageSet
+import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+
+class ConsumerFetcherThread(name: String,
+ val config: ConsumerConfig,
+ sourceBroker: Broker,
+ val consumerFetcherManager: ConsumerFetcherManager)
+ extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
+ socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
+ fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs,
+ minBytes = config.minFetchBytes) {
+
+ // process fetched data
+ def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
+ val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition))
+ if (pti.getFetchOffset != fetchOffset)
+ throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d"
+ .format(topic, partitionData.partition, pti.getFetchOffset, fetchOffset))
+ pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+ }
+
+ // handle a partition whose offset is out of range and return a new fetch offset
+ def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+ var startTimestamp : Long = 0
+ config.autoOffsetReset match {
+ case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime
+ case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
+ case _ => startTimestamp = OffsetRequest.LatestTime
+ }
+ val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0)
+
+ val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId))
+ pti.resetFetchOffset(newOffset)
+ pti.resetConsumeOffset(newOffset)
+ return newOffset
+ }
+
+ // any logic for partitions whose leader has changed
+ def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+ consumerFetcherManager.addPartitionsWithError(partitions)
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Jul 11 16:16:26 2012
@@ -50,20 +50,18 @@ private[consumer] class PartitionTopicIn
/**
* Enqueue a message set for processing
- * @return the number of valid bytes
*/
- def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
+ def enqueue(messages: ByteBufferMessageSet) {
val size = messages.validBytes
if(size > 0) {
// update fetched offset to the compressed data chunk size, not the decompressed message set size
trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
- chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+ chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
}
- size
}
/**
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Jul 11 16:16:26 2012
@@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerCo
with Logging {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
- private var fetcher: Option[Fetcher] = None
+ private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
// topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
@@ -143,7 +143,7 @@ private[kafka] class ZookeeperConsumerCo
private def createFetcher() {
if (enableFetcher)
- fetcher = Some(new Fetcher(config, zkClient))
+ fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
}
private def connectZk() {
@@ -161,7 +161,7 @@ private[kafka] class ZookeeperConsumerCo
try {
scheduler.shutdownNow()
fetcher match {
- case Some(f) => f.stopConnectionsToAllBrokers
+ case Some(f) => f.shutdown
case None =>
}
sendShutdownToAllQueues()
@@ -376,8 +376,6 @@ private[kafka] class ZookeeperConsumerCo
class ZKRebalancerListener(val group: String, val consumerIdString: String,
val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
extends IZkChildListener {
- private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]()
- private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
private var isWatcherTriggered = false
private val lock = new ReentrantLock
private val cond = lock.newCondition()
@@ -547,22 +545,39 @@ private[kafka] class ZookeeperConsumerCo
queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
fetcher match {
- case Some(f) => f.stopConnectionsToAllBrokers
- f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
- info("Committing all offsets after clearing the fetcher queues")
- /**
- * here, we need to commit offsets before stopping the consumer from returning any more messages
- * from the current data chunk. Since partition ownership is not yet released, this commit offsets
- * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
- * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
- * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
- * successfully and the fetchers restart to fetch more data chunks
- **/
- commitOffsets
+ case Some(f) =>
+ f.stopAllConnections
+ clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
+ info("Committing all offsets after clearing the fetcher queues")
+ /**
+ * here, we need to commit offsets before stopping the consumer from returning any more messages
+ * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+ * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+ * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+ * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+ * successfully and the fetchers restart to fetch more data chunks
+ **/
+ commitOffsets
case None =>
}
}
+ private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+ queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
+ messageStreams: Map[String,List[KafkaStream[_]]]) {
+
+ // Clear all but the currently iterated upon chunk in the consumer thread's queue
+ queuesTobeCleared.foreach(_.clear)
+ info("Cleared all relevant queues for this fetcher")
+
+ // Also clear the currently iterated upon chunk in the consumer threads
+ if(messageStreams != null)
+ messageStreams.foreach(_._2.foreach(s => s.clear()))
+
+ info("Cleared the data chunks in all the consumer message iterators")
+
+ }
+
private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
relevantTopicThreadIdsMap: Map[String, Set[String]]) {
// only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -21,14 +21,14 @@ import scala.collection.mutable
import kafka.utils.Logging
import kafka.cluster.Broker
-abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
// map of (source brokerid, fetcher Id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = name + " "
private def getFetcherId(topic: String, partitionId: Int) : Int = {
- (topic.hashCode() + 31 * partitionId) % numReplicaFetchers
+ (topic.hashCode() + 31 * partitionId) % numFetchers
}
// to be defined in subclass to create a specific fetcher
@@ -73,13 +73,12 @@ abstract class AbstractFetcherManager(na
None
}
- def shutdown() = {
- info("shutting down")
+ def closeAllFetchers() {
mapLock synchronized {
for ( (_, fetcher) <- fetcherThreadMap) {
fetcher.shutdown
}
+ fetcherThreadMap.clear()
}
- info("shutdown completes")
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -23,9 +23,9 @@ import kafka.consumer.SimpleConsumer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.Logging
import kafka.common.ErrorMapping
-import kafka.api.{PartitionData, FetchRequestBuilder}
-import scala.collection.mutable
+import collection.mutable
import kafka.message.ByteBufferMessageSet
+import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
/**
* Abstract class for fetching data from multiple partitions from the same broker.
@@ -42,18 +42,18 @@ abstract class AbstractFetcherThread(val
info("starting")
// callbacks to be defined in subclass
- // process fetched data and return the new fetch offset
+ // process fetched data
def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
- // any logic for partitions whose leader has changed
- def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit
+ // deal with partitions with errors, potentially due to leadership changes
+ def handlePartitionsWithErrors(partitions: Iterable[(String, Int)])
override def run() {
try {
- while(isRunning.get()) {
+ while(isRunning.get) {
val builder = new FetchRequestBuilder().
clientId(name).
replicaId(fetcherBrokerId).
@@ -66,47 +66,61 @@ abstract class AbstractFetcherThread(val
}
val fetchRequest = builder.build()
- val response = simpleConsumer.fetch(fetchRequest)
- trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ val partitionsWithError = new mutable.HashSet[(String, Int)]
+ var response: FetchResponse = null
+ try {
+ trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+ response = simpleConsumer.fetch(fetchRequest)
+ } catch {
+ case t =>
+ debug("error in fetch %s".format(fetchRequest), t)
+ if (isRunning.get) {
+ fetchMapLock synchronized {
+ partitionsWithError ++= fetchMap.keys
+ fetchMap.clear()
+ }
+ }
+ }
- var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil
- // process fetched data
- fetchMapLock synchronized {
- for ( topicData <- response.data ) {
- for ( partitionData <- topicData.partitionDataArray) {
- val topic = topicData.topic
- val partitionId = partitionData.partition
- val key = (topic, partitionId)
- val currentOffset = fetchMap.get(key)
- if (currentOffset.isDefined) {
- partitionData.error match {
- case ErrorMapping.NoError =>
- processPartitionData(topic, currentOffset.get, partitionData)
- val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
- fetchMap.put(key, newOffset)
- case ErrorMapping.OffsetOutOfRangeCode =>
- val newOffset = handleOffsetOutOfRange(topic, partitionId)
- fetchMap.put(key, newOffset)
- warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
- .format(currentOffset.get, topic, partitionId, newOffset))
- case ErrorMapping.NotLeaderForPartitionCode =>
- partitionsWithNewLeader ::= key
- case _ =>
- error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
- ErrorMapping.exceptionFor(partitionData.error))
+ if (response != null) {
+ // process fetched data
+ fetchMapLock synchronized {
+ for ( topicData <- response.data ) {
+ for ( partitionData <- topicData.partitionDataArray) {
+ val topic = topicData.topic
+ val partitionId = partitionData.partition
+ val key = (topic, partitionId)
+ val currentOffset = fetchMap.get(key)
+ if (currentOffset.isDefined) {
+ partitionData.error match {
+ case ErrorMapping.NoError =>
+ processPartitionData(topic, currentOffset.get, partitionData)
+ val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+ fetchMap.put(key, newOffset)
+ case ErrorMapping.OffsetOutOfRangeCode =>
+ val newOffset = handleOffsetOutOfRange(topic, partitionId)
+ fetchMap.put(key, newOffset)
+ warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+ .format(currentOffset.get, topic, partitionId, newOffset))
+ case _ =>
+ error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+ ErrorMapping.exceptionFor(partitionData.error))
+ partitionsWithError += key
+ fetchMap.remove(key)
+ }
}
}
}
}
}
- if (partitionsWithNewLeader.size > 0) {
- debug("changing leaders for %s".format(partitionsWithNewLeader))
- handlePartitionsWithNewLeader(partitionsWithNewLeader)
+ if (partitionsWithError.size > 0) {
+ debug("handling partitions with error for %s".format(partitionsWithError))
+ handlePartitionsWithErrors(partitionsWithError)
}
}
} catch {
- case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down")
- case e1 => error("error in replica fetcher runnable", e1)
+ case e: InterruptedException => info("interrupted. Shutting down")
+ case e1 => error("error in fetching", e1)
}
shutdownComplete()
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Jul 11 16:16:26 2012
@@ -132,8 +132,10 @@ class KafkaConfig(props: Properties) ext
/** the number of byes of messages to attempt to fetch */
val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
+ /** max wait time for each fetcher request issued by follower replicas*/
val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
+ /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
/* number of fetcher threads used to replicate messages from a source broker.
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -22,8 +22,13 @@ import kafka.cluster.Broker
class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
- def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+ override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
}
+ def shutdown() {
+ info("shutting down")
+ closeAllFetchers()
+ info("shutdown completed")
+ }
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -27,8 +27,8 @@ class ReplicaFetcherThread(name:String,
fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
minBytes = brokerConfig.replicaMinBytes) {
- // process fetched data and return the new fetch offset
- def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = {
+ // process fetched data
+ def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
val partitionId = partitionData.partition
val replica = replicaMgr.getReplica(topic, partitionId).get
val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
@@ -51,7 +51,7 @@ class ReplicaFetcherThread(name:String,
}
// any logic for partitions whose leader has changed
- def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = {
+ def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
// no handler needed since the controller will make the changes accordingly
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Wed Jul 11 16:16:26 2012
@@ -51,18 +51,18 @@ class FetcherTest extends JUnit3Suite wi
new AtomicLong(0),
new AtomicInteger(0)))
- var fetcher: Fetcher = null
+ var fetcher: ConsumerFetcherManager = null
override def setUp() {
super.setUp
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
- fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
- fetcher.stopConnectionsToAllBrokers
+ fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
+ fetcher.stopAllConnections()
fetcher.startConnections(topicInfos, cluster)
}
override def tearDown() {
- fetcher.stopConnectionsToAllBrokers
+ fetcher.shutdown()
super.tearDown
}
@@ -103,5 +103,5 @@ class FetcherTest extends JUnit3Suite wi
return
}
}
-
+
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Wed Jul 11 16:16:26 2012
@@ -42,10 +42,8 @@ class LogCorruptionTest extends JUnit3Su
def testMessageSizeTooLarge() {
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
- val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
requestHandlerLogger.setLevel(Level.FATAL)
- fetcherLogger.setLevel(Level.FATAL)
// send some messages
val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes())))
@@ -100,6 +98,5 @@ class LogCorruptionTest extends JUnit3Su
zkConsumerConnector1.shutdown
requestHandlerLogger.setLevel(Level.ERROR)
- fetcherLogger.setLevel(Level.ERROR)
}
}