You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/11 18:16:27 UTC

svn commit: r1360261 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/consumer/ main/scala/kafka/server/ test/scala/unit/kafka/integration/

Author: junrao
Date: Wed Jul 11 16:16:26 2012
New Revision: 1360261

URL: http://svn.apache.org/viewvc?rev=1360261&view=rev
Log:
ZookeeperConsumerConnector needs to connect to new leader after leadership change; patched by Jun Rao; reviewed by Joel Koshy; KAFKA-362

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/Fetcher.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Wed Jul 11 16:16:26 2012
@@ -83,6 +83,9 @@ class ConsumerConfig(props: Properties) 
   /** backoff time between retries during rebalance */
   val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
 
+  /** backoff time to refresh the leader of a partition after it loses the current leader */
+  val refreshLeaderBackoffMs = Utils.getInt(props, "refresh.leader.backoff.ms", 200)
+
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1360261&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.server.{AbstractFetcherThread, AbstractFetcherManager}
+import kafka.cluster.{Cluster, Broker}
+import scala.collection.immutable
+import scala.collection.mutable
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.ZkUtils._
+import kafka.utils.SystemTime
+import java.util.concurrent.CountDownLatch
+
+/**
+ *  Usage:
+ *  Once ConsumerFetcherManager is created, startConnections() and stopAllConnections() can be called repeatedly
+ *  until shutdown() is called.
+ */
+class ConsumerFetcherManager(private val consumerIdString: String,
+                             private val config: ConsumerConfig,
+                             private val zkClient : ZkClient)
+        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
+  private var partitionMap: immutable.Map[(String, Int), PartitionTopicInfo] = null
+  private var cluster: Cluster = null
+  private val noLeaderPartitionSet = new mutable.HashSet[(String, Int)]
+  private val lock = new ReentrantLock
+  private val cond = lock.newCondition()
+  private val isShuttingDown = new AtomicBoolean(false)
+  private val leaderFinderThreadShutdownLatch = new CountDownLatch(1)  
+  private val leaderFinderThread = new Thread(consumerIdString + "_leader_finder_thread") {
+    // thread responsible for adding the fetcher to the right broker when leader is available
+    override def run() {
+      info("starting %s".format(getName))
+      while (!isShuttingDown.get) {
+        try {
+          lock.lock()
+          try {
+            if (noLeaderPartitionSet.isEmpty)
+              cond.await()
+            for ((topic, partitionId) <- noLeaderPartitionSet) {
+              // find the leader for this partition
+              getLeaderForPartition(zkClient, topic, partitionId) match {
+                case Some(leaderId) =>
+                  cluster.getBroker(leaderId) match {
+                    case Some(broker) =>
+                      val pti = partitionMap((topic, partitionId))
+                      addFetcher(topic, partitionId, pti.getFetchOffset(), broker)
+                      noLeaderPartitionSet.remove((topic, partitionId))
+                    case None =>
+                      error("Broker %d is unavailable, fetcher for topic %s partition %d could not be started"
+                            .format(leaderId, topic, partitionId))
+                  }
+                case None => // let it go since we will keep retrying
+              }
+            }
+          } finally {
+            lock.unlock()
+          }
+          Thread.sleep(config.refreshLeaderBackoffMs)
+        } catch {
+          case t =>
+            if (!isShuttingDown.get())
+              error("error in %s".format(getName), t)
+        }
+      }
+      leaderFinderThreadShutdownLatch.countDown()
+      info("stopping %s".format(getName))
+    }
+  }
+  leaderFinderThread.start()
+
+
+  override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+                              config, sourceBroker, this)
+  }
+
+  def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
+    if (isShuttingDown.get)
+      throw new RuntimeException("%s already shutdown".format(name))
+    lock.lock()
+    try {
+      partitionMap = topicInfos.map(tpi => ((tpi.topic, tpi.partitionId), tpi)).toMap
+      this.cluster = cluster
+      noLeaderPartitionSet ++= topicInfos.map(tpi => (tpi.topic, tpi.partitionId))
+      cond.signalAll()
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def stopAllConnections() {
+    lock.lock()
+    try {
+      partitionMap = null
+      noLeaderPartitionSet.clear()
+    } finally {
+      lock.unlock()
+    }
+    closeAllFetchers()
+  }
+
+  def getPartitionTopicInfo(key: (String, Int)) : PartitionTopicInfo = {
+    var pti :PartitionTopicInfo =null
+    lock.lock()
+    try {
+      pti = partitionMap(key)
+    } finally {
+      lock.unlock()
+    }
+    pti      
+  }
+
+  def addPartitionsWithError(partitionList: Iterable[(String, Int)]) {
+    debug("adding partitions with error %s".format(partitionList))
+    lock.lock()
+    try {
+      if (partitionMap != null) {
+        noLeaderPartitionSet ++= partitionList
+        cond.signalAll()
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def shutdown() {
+    info("shutting down")
+    isShuttingDown.set(true)
+    leaderFinderThread.interrupt()
+    leaderFinderThreadShutdownLatch.await()
+    stopAllConnections()
+    info("shutdown completed")
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1360261&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import kafka.cluster.Broker
+import kafka.server.AbstractFetcherThread
+import kafka.message.ByteBufferMessageSet
+import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+
+class ConsumerFetcherThread(name: String,
+                            val config: ConsumerConfig,
+                            sourceBroker: Broker,
+                            val consumerFetcherManager: ConsumerFetcherManager)
+        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
+          socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
+          fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs,
+          minBytes = config.minFetchBytes) {
+
+  // process fetched data
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
+    val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition))
+    if (pti.getFetchOffset != fetchOffset)
+      throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d"
+                                .format(topic, partitionData.partition, pti.getFetchOffset, fetchOffset))
+    pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+  }
+
+  // handle a partition whose offset is out of range and return a new fetch offset
+  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+    var startTimestamp : Long = 0
+    config.autoOffsetReset match {
+      case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime
+      case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
+      case _ => startTimestamp = OffsetRequest.LatestTime
+    }
+    val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0)
+
+    val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId))
+    pti.resetFetchOffset(newOffset)
+    pti.resetConsumeOffset(newOffset)
+    return newOffset
+  }
+
+  // any logic for partitions whose leader has changed
+  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+    consumerFetcherManager.addPartitionsWithError(partitions)
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Wed Jul 11 16:16:26 2012
@@ -50,20 +50,18 @@ private[consumer] class PartitionTopicIn
 
   /**
    * Enqueue a message set for processing
-   * @return the number of valid bytes
    */
-  def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
+  def enqueue(messages: ByteBufferMessageSet) {
     val size = messages.validBytes
     if(size > 0) {
       // update fetched offset to the compressed data chunk size, not the decompressed message set size
       trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
-      chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
+      chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       val newOffset = fetchedOffset.addAndGet(size)
       debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
       ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
       ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
     }
-    size
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Jul 11 16:16:26 2012
@@ -91,7 +91,7 @@ private[kafka] class ZookeeperConsumerCo
         with Logging {
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
-  private var fetcher: Option[Fetcher] = None
+  private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
@@ -143,7 +143,7 @@ private[kafka] class ZookeeperConsumerCo
 
   private def createFetcher() {
     if (enableFetcher)
-      fetcher = Some(new Fetcher(config, zkClient))
+      fetcher = Some(new ConsumerFetcherManager(consumerIdString, config, zkClient))
   }
 
   private def connectZk() {
@@ -161,7 +161,7 @@ private[kafka] class ZookeeperConsumerCo
       try {
         scheduler.shutdownNow()
         fetcher match {
-          case Some(f) => f.stopConnectionsToAllBrokers
+          case Some(f) => f.shutdown
           case None =>
         }
         sendShutdownToAllQueues()
@@ -376,8 +376,6 @@ private[kafka] class ZookeeperConsumerCo
   class ZKRebalancerListener(val group: String, val consumerIdString: String,
                              val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_]]])
     extends IZkChildListener {
-    private var oldPartitionsPerTopicMap: mutable.Map[String, Seq[String]] = new mutable.HashMap[String, Seq[String]]()
-    private var oldConsumersPerTopicMap: mutable.Map[String,List[String]] = new mutable.HashMap[String,List[String]]()
     private var isWatcherTriggered = false
     private val lock = new ReentrantLock
     private val cond = lock.newCondition()
@@ -547,22 +545,39 @@ private[kafka] class ZookeeperConsumerCo
                                        queuesToBeCleared: Iterable[BlockingQueue[FetchedDataChunk]]) {
       var allPartitionInfos = topicRegistry.values.map(p => p.values).flatten
       fetcher match {
-        case Some(f) => f.stopConnectionsToAllBrokers
-        f.clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
-        info("Committing all offsets after clearing the fetcher queues")
-        /**
-        * here, we need to commit offsets before stopping the consumer from returning any more messages
-        * from the current data chunk. Since partition ownership is not yet released, this commit offsets
-        * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
-        * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
-        * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
-        * successfully and the fetchers restart to fetch more data chunks
-        **/
-        commitOffsets
+        case Some(f) =>
+          f.stopAllConnections
+          clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams)
+          info("Committing all offsets after clearing the fetcher queues")
+          /**
+          * here, we need to commit offsets before stopping the consumer from returning any more messages
+          * from the current data chunk. Since partition ownership is not yet released, this commit offsets
+          * call will ensure that the offsets committed now will be used by the next consumer thread owning the partition
+          * for the current data chunk. Since the fetchers are already shutdown and this is the last chunk to be iterated
+          * by the consumer, there will be no more messages returned by this iterator until the rebalancing finishes
+          * successfully and the fetchers restart to fetch more data chunks
+          **/
+          commitOffsets
         case None =>
       }
     }
 
+    private def clearFetcherQueues(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster,
+                                   queuesTobeCleared: Iterable[BlockingQueue[FetchedDataChunk]],
+                                   messageStreams: Map[String,List[KafkaStream[_]]]) {
+
+      // Clear all but the currently iterated upon chunk in the consumer thread's queue
+      queuesTobeCleared.foreach(_.clear)
+      info("Cleared all relevant queues for this fetcher")
+
+      // Also clear the currently iterated upon chunk in the consumer threads
+      if(messageStreams != null)
+         messageStreams.foreach(_._2.foreach(s => s.clear()))
+
+      info("Cleared the data chunks in all the consumer message iterators")
+
+    }
+
     private def closeFetchers(cluster: Cluster, messageStreams: Map[String,List[KafkaStream[_]]],
                               relevantTopicThreadIdsMap: Map[String, Set[String]]) {
       // only clear the fetcher queues for certain topic partitions that *might* no longer be served by this consumer

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -21,14 +21,14 @@ import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.cluster.Broker
 
-abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = name + " "
 
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
-    (topic.hashCode() + 31 * partitionId) % numReplicaFetchers
+    (topic.hashCode() + 31 * partitionId) % numFetchers
   }
 
   // to be defined in subclass to create a specific fetcher
@@ -73,13 +73,12 @@ abstract class AbstractFetcherManager(na
     None
   }
 
-  def shutdown() = {
-    info("shutting down")
+  def closeAllFetchers() {
     mapLock synchronized {
       for ( (_, fetcher) <- fetcherThreadMap) {
         fetcher.shutdown
       }
+      fetcherThreadMap.clear()
     }
-    info("shutdown completes")
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -23,9 +23,9 @@ import kafka.consumer.SimpleConsumer
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils.Logging
 import kafka.common.ErrorMapping
-import kafka.api.{PartitionData, FetchRequestBuilder}
-import scala.collection.mutable
+import collection.mutable
 import kafka.message.ByteBufferMessageSet
+import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
@@ -42,18 +42,18 @@ abstract class AbstractFetcherThread(val
   info("starting")
   // callbacks to be defined in subclass
 
-  // process fetched data and return the new fetch offset
+  // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
 
-  // any logic for partitions whose leader has changed
-  def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit
+  // deal with partitions with errors, potentially due to leadership changes
+  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)])
 
   override def run() {
     try {
-      while(isRunning.get()) {
+      while(isRunning.get) {
         val builder = new FetchRequestBuilder().
           clientId(name).
           replicaId(fetcherBrokerId).
@@ -66,47 +66,61 @@ abstract class AbstractFetcherThread(val
         }
 
         val fetchRequest = builder.build()
-        val response = simpleConsumer.fetch(fetchRequest)
-        trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+        val partitionsWithError = new mutable.HashSet[(String, Int)]
+        var response: FetchResponse = null
+        try {
+          trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+          response = simpleConsumer.fetch(fetchRequest)
+        } catch {
+          case t =>
+            debug("error in fetch %s".format(fetchRequest), t)
+            if (isRunning.get) {
+              fetchMapLock synchronized {
+                partitionsWithError ++= fetchMap.keys
+                fetchMap.clear()
+              }
+            }
+        }
 
-        var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil
-        // process fetched data
-        fetchMapLock synchronized {
-          for ( topicData <- response.data ) {
-            for ( partitionData <- topicData.partitionDataArray) {
-              val topic = topicData.topic
-              val partitionId = partitionData.partition
-              val key = (topic, partitionId)
-              val currentOffset = fetchMap.get(key)
-              if (currentOffset.isDefined) {
-                partitionData.error match {
-                  case ErrorMapping.NoError =>
-                    processPartitionData(topic, currentOffset.get, partitionData)
-                    val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
-                    fetchMap.put(key, newOffset)
-                  case ErrorMapping.OffsetOutOfRangeCode =>
-                    val newOffset = handleOffsetOutOfRange(topic, partitionId)
-                    fetchMap.put(key, newOffset)
-                    warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
-                      .format(currentOffset.get, topic, partitionId, newOffset))
-                  case ErrorMapping.NotLeaderForPartitionCode =>
-                    partitionsWithNewLeader ::= key
-                  case _ =>
-                    error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
-                          ErrorMapping.exceptionFor(partitionData.error))
+        if (response != null) {
+          // process fetched data
+          fetchMapLock synchronized {
+            for ( topicData <- response.data ) {
+              for ( partitionData <- topicData.partitionDataArray) {
+                val topic = topicData.topic
+                val partitionId = partitionData.partition
+                val key = (topic, partitionId)
+                val currentOffset = fetchMap.get(key)
+                if (currentOffset.isDefined) {
+                  partitionData.error match {
+                    case ErrorMapping.NoError =>
+                      processPartitionData(topic, currentOffset.get, partitionData)
+                      val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                      fetchMap.put(key, newOffset)
+                    case ErrorMapping.OffsetOutOfRangeCode =>
+                      val newOffset = handleOffsetOutOfRange(topic, partitionId)
+                      fetchMap.put(key, newOffset)
+                      warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+                        .format(currentOffset.get, topic, partitionId, newOffset))
+                    case _ =>
+                      error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+                            ErrorMapping.exceptionFor(partitionData.error))
+                      partitionsWithError += key
+                      fetchMap.remove(key)
+                  }
                 }
               }
             }
           }
         }
-        if (partitionsWithNewLeader.size > 0) {
-          debug("changing leaders for %s".format(partitionsWithNewLeader))
-          handlePartitionsWithNewLeader(partitionsWithNewLeader)
+        if (partitionsWithError.size > 0) {
+          debug("handling partitions with error for %s".format(partitionsWithError))
+          handlePartitionsWithErrors(partitionsWithError)
         }
       }
     } catch {
-      case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down")
-      case e1 => error("error in replica fetcher runnable", e1)
+      case e: InterruptedException => info("interrupted. Shutting down")
+      case e1 => error("error in fetching", e1)
     }
     shutdownComplete()
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Wed Jul 11 16:16:26 2012
@@ -132,8 +132,10 @@ class KafkaConfig(props: Properties) ext
   /** the number of byes of messages to attempt to fetch */
   val replicaFetchSize = Utils.getInt(props, "replica.fetch.size", ConsumerConfig.FetchSize)
 
+  /** max wait time for each fetcher request issued by follower replicas*/
   val replicaMaxWaitTimeMs = Utils.getInt(props, "replica.fetch.wait.time.ms", 500)
 
+  /** minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs */
   val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
 
   /* number of fetcher threads used to replicate messages from a source broker.

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Wed Jul 11 16:16:26 2012
@@ -22,8 +22,13 @@ import kafka.cluster.Broker
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
         extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
 
-  def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+  override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
   }
 
+  def shutdown() {
+    info("shutting down")
+    closeAllFetchers()
+    info("shutdown completed")
+  }  
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Wed Jul 11 16:16:26 2012
@@ -27,8 +27,8 @@ class ReplicaFetcherThread(name:String, 
           fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
           minBytes = brokerConfig.replicaMinBytes) {
 
-  // process fetched data and return the new fetch offset
-  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = {
+  // process fetched data
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
     val partitionId = partitionData.partition
     val replica = replicaMgr.getReplica(topic, partitionId).get
     val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
@@ -51,7 +51,7 @@ class ReplicaFetcherThread(name:String, 
   }
 
   // any logic for partitions whose leader has changed
-  def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = {
+  def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
     // no handler needed since the controller will make the changes accordingly
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Wed Jul 11 16:16:26 2012
@@ -51,18 +51,18 @@ class FetcherTest extends JUnit3Suite wi
                                                       new AtomicLong(0), 
                                                       new AtomicInteger(0)))
   
-  var fetcher: Fetcher = null
+  var fetcher: ConsumerFetcherManager = null
 
   override def setUp() {
     super.setUp
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
-    fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
-    fetcher.stopConnectionsToAllBrokers
+    fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
+    fetcher.stopAllConnections()
     fetcher.startConnections(topicInfos, cluster)
   }
 
   override def tearDown() {
-    fetcher.stopConnectionsToAllBrokers
+    fetcher.shutdown()
     super.tearDown
   }
     
@@ -103,5 +103,5 @@ class FetcherTest extends JUnit3Suite wi
         return
     }
   }
-  
+    
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1360261&r1=1360260&r2=1360261&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Wed Jul 11 16:16:26 2012
@@ -42,10 +42,8 @@ class LogCorruptionTest extends JUnit3Su
 
   def testMessageSizeTooLarge() {
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
-    val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
 
     requestHandlerLogger.setLevel(Level.FATAL)
-    fetcherLogger.setLevel(Level.FATAL)
 
     // send some messages
     val producerData = new ProducerData[String, Message](topic, topic, List(new Message("hello".getBytes())))
@@ -100,6 +98,5 @@ class LogCorruptionTest extends JUnit3Su
 
     zkConsumerConnector1.shutdown
     requestHandlerLogger.setLevel(Level.ERROR)
-    fetcherLogger.setLevel(Level.ERROR)
   }
 }