You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/23 09:14:25 UTC

svn commit: r1353086 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/log/ main/scala/kafka/server/ test/scala/unit/kafka/server/ test/scala/unit/kafka/utils/

Author: junrao
Date: Sat Jun 23 07:14:24 2012
New Revision: 1353086

URL: http://svn.apache.org/viewvc?rev=1353086&view=rev
Log:
using MultiFetch in the follower; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; KAFKA-339

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Sat Jun 23 07:14:24 2012
@@ -18,7 +18,6 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.server.{KafkaConfig, ReplicaFetcherThread}
 import java.lang.IllegalStateException
 import kafka.utils.Logging
 
@@ -28,7 +27,6 @@ class Replica(val brokerId: Int,
               var log: Option[Log] = None,
               var leoUpdateTime: Long = -1L) extends Logging {
   private var logEndOffset: Long = -1L
-  private var replicaFetcherThread: ReplicaFetcherThread = null
 
   def logEndOffset(newLeo: Option[Long] = None): Long = {
     isLocal match {
@@ -88,32 +86,6 @@ class Replica(val brokerId: Int,
     }
   }
 
-  def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
-    val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
-    replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
-    replicaFetcherThread.setDaemon(true)
-    replicaFetcherThread.start()
-  }
-
-  def stopReplicaFetcherThread() {
-    if(replicaFetcherThread != null) {
-      replicaFetcherThread.shutdown()
-      replicaFetcherThread = null
-    }
-  }
-
-  def getIfFollowerAndLeader(): (Boolean, Int) = {
-    replicaFetcherThread != null match {
-      case true => (true, replicaFetcherThread.getLeader().id)
-      case false => (false, -1)
-    }
-  }
-
-  def close() {
-    if(replicaFetcherThread != null)
-      replicaFetcherThread.shutdown()
-  }
-
   override def equals(that: Any): Boolean = {
     if(!(that.isInstanceOf[Replica]))
       return false

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Sat Jun 23 07:14:24 2012
@@ -60,6 +60,8 @@ object ErrorMapping {
   def maybeThrowException(code: Short) =
     if(code != 0)
       throw codeToException(code).newInstance()
+
+  def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
 }
 
 class InvalidTopicException(message: String) extends RuntimeException(message) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Sat Jun 23 07:14:24 2012
@@ -405,6 +405,19 @@ private[kafka] class Log(val dir: File, 
     ret
   }
 
+  /**
+   *  Truncate all segments in the log and start a new segment on a new offset
+   */
+  def truncateAndStartWithNewOffset(newOffset: Long) {
+    lock synchronized {
+      val deletedSegments = segments.trunc(segments.view.size)
+      val newFile = new File(dir, Log.nameFromOffset(newOffset))
+      debug("tuncate and start log '" + name + "' to " + newFile.getName())
+      segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+      deleteSegments(deletedSegments)
+    }
+  }
+
   /* Attemps to delete all provided segments from a log and returns how many it was able to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection.mutable
+import kafka.utils.Logging
+import kafka.cluster.Broker
+
+abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging {
+    // map of (source brokerid, fetcher Id per source broker) => fetcher
+  private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
+  private val mapLock = new Object
+  this.logIdent = name + " "
+
+  private def getFetcherId(topic: String, partitionId: Int) : Int = {
+    (topic.hashCode() + 31 * partitionId) % numReplicaFetchers
+  }
+
+  // to be defined in subclass to create a specific fetcher
+  def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
+
+  def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
+    mapLock synchronized {
+      var fetcherThread: AbstractFetcherThread = null
+      val key = (sourceBroker.id, getFetcherId(topic, partitionId))
+      fetcherThreadMap.get(key) match {
+        case Some(f) => fetcherThread = f
+        case None =>
+          fetcherThread = createFetcherThread(key._2, sourceBroker)
+          fetcherThreadMap.put(key, fetcherThread)
+          fetcherThread.start
+      }
+      fetcherThread.addPartition(topic, partitionId, initialOffset)
+      info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
+          .format(topic, partitionId, initialOffset, sourceBroker.id, key._2))
+    }
+  }
+
+  def removeFetcher(topic: String, partitionId: Int) {
+    info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
+    mapLock synchronized {
+      for ((key, fetcher) <- fetcherThreadMap) {
+        fetcher.removePartition(topic, partitionId)
+        if (fetcher.partitionCount <= 0) {
+          fetcher.shutdown
+          fetcherThreadMap.remove(key)
+        }
+      }
+    }
+  }
+
+  def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
+    mapLock synchronized {
+      for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
+        if (fetcher.hasPartition(topic, partitionId))
+          return Some(sourceBrokerId)
+    }
+    None
+  }
+
+  def shutdown() = {
+    info("shutting down")
+    mapLock synchronized {
+      for ( (_, fetcher) <- fetcherThreadMap) {
+        fetcher.shutdown
+      }
+    }
+    info("shutdown completes")
+  }
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.CountDownLatch
+import kafka.cluster.Broker
+import kafka.consumer.SimpleConsumer
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.Logging
+import kafka.common.ErrorMapping
+import kafka.api.{PartitionData, FetchRequestBuilder}
+import scala.collection.mutable
+import kafka.message.ByteBufferMessageSet
+
+/**
+ *  Abstract class for fetching data from multiple partitions from the same broker.
+ */
+abstract class AbstractFetcherThread(val name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+                                     fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
+  extends Thread(name) with Logging {
+  private val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+  private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
+  private val fetchMapLock = new Object
+  val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
+  this.logIdent = name + " "
+  info("starting")
+  // callbacks to be defined in subclass
+
+  // process fetched data and return the new fetch offset
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
+
+  // handle a partition whose offset is out of range and return a new fetch offset
+  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
+
+  // any logic for partitions whose leader has changed
+  def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit
+
+  override def run() {
+    try {
+      while(isRunning.get()) {
+        val builder = new FetchRequestBuilder().
+          clientId(name).
+          replicaId(fetcherBrokerId).
+          maxWait(maxWait).
+          minBytes(minBytes)
+
+        fetchMapLock synchronized {
+          for ( ((topic, partitionId), offset) <- fetchMap )
+            builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
+        }
+
+        val fetchRequest = builder.build()
+        val response = simpleConsumer.fetch(fetchRequest)
+        trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+
+        var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil
+        // process fetched data
+        fetchMapLock synchronized {
+          for ( topicData <- response.data ) {
+            for ( partitionData <- topicData.partitionData) {
+              val topic = topicData.topic
+              val partitionId = partitionData.partition
+              val key = (topic, partitionId)
+              val currentOffset = fetchMap.get(key)
+              if (currentOffset.isDefined) {
+                partitionData.error match {
+                  case ErrorMapping.NoError =>
+                    processPartitionData(topic, currentOffset.get, partitionData)
+                    val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                    fetchMap.put(key, newOffset)
+                  case ErrorMapping.OffsetOutOfRangeCode =>
+                    val newOffset = handleOffsetOutOfRange(topic, partitionId)
+                    fetchMap.put(key, newOffset)
+                    warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+                      .format(currentOffset.get, topic, partitionId, newOffset))
+                  case ErrorMapping.NotLeaderForPartitionCode =>
+                    partitionsWithNewLeader ::= key
+                  case _ =>
+                    error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+                          ErrorMapping.exceptionFor(partitionData.error))
+                }
+              }
+            }
+          }
+        }
+        if (partitionsWithNewLeader.size > 0) {
+          debug("changing leaders for %s".format(partitionsWithNewLeader))
+          handlePartitionsWithNewLeader(partitionsWithNewLeader)
+        }
+      }
+    } catch {
+      case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down")
+      case e1 => error("error in replica fetcher runnable", e1)
+    }
+    shutdownComplete()
+  }
+
+  def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
+    fetchMapLock synchronized {
+      fetchMap.put((topic, partitionId), initialOffset)
+    }
+  }
+
+  def removePartition(topic: String, partitionId: Int) {
+    fetchMapLock synchronized {
+      fetchMap.remove((topic, partitionId))
+    }
+  }
+
+  def hasPartition(topic: String, partitionId: Int): Boolean = {
+    fetchMapLock synchronized {
+      fetchMap.get((topic, partitionId)).isDefined
+    }
+  }
+
+  def partitionCount() = {
+    fetchMapLock synchronized {
+      fetchMap.size
+    }
+  }
+  
+  private def shutdownComplete() = {
+    simpleConsumer.close()
+    shutdownLatch.countDown
+  }
+
+  def shutdown() {
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("shutdown completed")
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Jun 23 07:14:24 2012
@@ -136,4 +136,7 @@ class KafkaConfig(props: Properties) ext
 
   val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
 
+  /* number of fetcher threads used to replicate messages from a source broker.
+  *  Increasing this value can increase the degree of I/O parallelism in the follower broker. */
+  val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
  }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Broker
+
+class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
+        extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
+
+  def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
+  }
+
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Sat Jun 23 07:14:24 2012
@@ -17,67 +17,41 @@
 
 package kafka.server
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
-import kafka.api.FetchRequestBuilder
-import kafka.utils.Logging
-import kafka.cluster.{Broker, Replica}
-import kafka.consumer.SimpleConsumer
-
-class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
-  extends Thread(name) with Logging {
-  val isRunning: AtomicBoolean = new AtomicBoolean(true)
-  private val shutdownLatch = new CountDownLatch(1)
-  private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
-    config.replicaSocketTimeoutMs, config.replicaSocketBufferSize)
-
-  override def run() {
-    try {
-      info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
-      while(isRunning.get()) {
-        val builder = new FetchRequestBuilder().
-          clientId(name).
-          replicaId(replica.brokerId).
-          maxWait(config.replicaMaxWaitTimeMs).
-          minBytes(config.replicaMinBytes)
-
-        // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message
-        // replication actually works
-        val fetchOffset = replica.logEndOffset()
-        trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
-          .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
-        builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize)
-
-        val fetchRequest = builder.build()
-        val response = replicaConsumer.fetch(fetchRequest)
-        // TODO: KAFKA-339 Check for error. Don't blindly read the messages
-        // append messages to local log
-        replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
-        // record the hw sent by the leader for this partition
-        val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
-        replica.highWatermark(Some(followerHighWatermark))
-        trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
-          .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
-      }
-    }catch {
-      case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
-      case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
-    }
-    shutdownComplete()
+import kafka.api.{OffsetRequest, PartitionData}
+import kafka.cluster.Broker
+import kafka.message.ByteBufferMessageSet
+
+class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
+        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+          socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
+          fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
+          minBytes = brokerConfig.replicaMinBytes) {
+
+  // process fetched data and return the new fetch offset
+  def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = {
+    val partitionId = partitionData.partition
+    val replica = replicaMgr.getReplica(topic, partitionId).get
+    val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+
+    if (fetchOffset != replica.logEndOffset())
+      throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+    replica.log.get.append(messageSet)
+    replica.highWatermark(Some(partitionData.hw))
+    trace("follower %d set replica highwatermark for topic %s partition %d to %d"
+          .format(replica.brokerId, topic, partitionId, partitionData.hw))
   }
 
-  private def shutdownComplete() = {
-    replicaConsumer.close()
-    shutdownLatch.countDown
+  // handle a partition whose offset is out of range and return a new fetch offset
+  def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+    // This means the local replica is out of date. Truncate the log and catch up from beginning.
+    val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1)
+    val replica = replicaMgr.getReplica(topic, partitionId).get
+    replica.log.get.truncateAndStartWithNewOffset(offsets(0))
+    return offsets(0)
   }
 
-  def getLeader(): Broker = leaderBroker
-
-  def shutdown() {
-    info("Shutting down replica fetcher thread")
-    isRunning.set(false)
-    interrupt()
-    shutdownLatch.await()
-    info("Replica fetcher thread shutdown completed")
+  // any logic for partitions whose leader has changed
+  def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = {
+    // no handler needed since the controller will make the changes accordingly
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sat Jun 23 07:14:24 2012
@@ -32,6 +32,8 @@ class ReplicaManager(val config: KafkaCo
   private var leaderReplicas = new ListBuffer[Partition]()
   private val leaderReplicaLock = new ReentrantLock()
   private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+  private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+
   // start ISR expiration thread
   isrExpirationScheduler.startUp
   isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
@@ -139,7 +141,7 @@ class ReplicaManager(val config: KafkaCo
 
   def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
     // stop replica fetcher thread, if any
-    replica.stopReplicaFetcherThread()
+    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
     // read and cache the ISR
     replica.partition.leaderId(Some(replica.brokerId))
     replica.partition.updateISR(currentISRInZk.toSet)
@@ -153,7 +155,7 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
-    info("Broker %d becoming follower to leader %d for topic %s partition %d"
+    info("broker %d intending to follow leader %d for topic %s partition %d"
       .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
     // remove this replica's partition from the ISR expiration queue
     try {
@@ -169,13 +171,15 @@ class ReplicaManager(val config: KafkaCo
     }
     // get leader for this replica
     val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
-    val isReplicaAFollower = replica.getIfFollowerAndLeader()
+    val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
     // Become follower only if it is not already following the same leader
-    if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
+    if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
+      info("broker %d becoming follower to leader %d for topic %s partition %d"
+        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
       // stop fetcher thread to previous leader
-      replica.stopReplicaFetcherThread()
+      replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
       // start fetcher thread to current leader
-      replica.startReplicaFetcherThread(leaderBroker, config)
+      replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
     }
   }
 
@@ -244,6 +248,6 @@ class ReplicaManager(val config: KafkaCo
 
   def close() {
     isrExpirationScheduler.shutdown()
-    allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
+    replicaFetcherManager.shutdown()
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Sat Jun 23 07:14:24 2012
@@ -23,17 +23,15 @@ import kafka.utils.TestUtils._
 import kafka.producer.ProducerData
 import kafka.serializer.StringEncoder
 import kafka.admin.CreateTopicCommand
-import kafka.cluster.{Replica, Partition, Broker}
-import kafka.utils.{MockTime, TestUtils}
+import kafka.utils.TestUtils
 import junit.framework.Assert._
-import java.io.File
-import kafka.log.Log
 
 class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness  {
   val props = createBrokerConfigs(2)
   val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
   var brokers: Seq[KafkaServer] = null
-  val topic = "foobar"
+  val topic1 = "foo"
+  val topic2 = "bar"
 
   override def setUp() {
     super.setUp()
@@ -41,45 +39,36 @@ class ReplicaFetchTest extends JUnit3Sui
   }
 
   override def tearDown() {
-    super.tearDown()
     brokers.foreach(_.shutdown())
+    super.tearDown()
   }
 
   def testReplicaFetcherThread() {
     val partition = 0
-    val testMessageList = List("test1", "test2", "test3", "test4")
-    val leaderBrokerId = configs.head.brokerId
-    val followerBrokerId = configs.last.brokerId
-    val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+    val testMessageList1 = List("test1", "test2", "test3", "test4")
+    val testMessageList2 = List("test5", "test6", "test7", "test8")
 
     // create a topic and partition and await leadership
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
-    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+    for (topic <- List(topic1,topic2)) {
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+    }
 
     // send test messages to leader
     val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
-    producer.send(new ProducerData[String, String](topic, "test", testMessageList))
-
-    // create a tmp directory
-    val tmpLogDir = TestUtils.tempDir()
-    val replicaLogDir = new File(tmpLogDir, topic + "-" + partition)
-    replicaLogDir.mkdirs()
-    val replicaLog = new Log(replicaLogDir, 500, 500, false)
-
-    // create replica fetch thread
-    val time = new MockTime
-    val testPartition = new Partition(topic, partition, time)
-    testPartition.leaderId(Some(leaderBrokerId))
-    val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
-    val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
-
-    // start a replica fetch thread to the above broker
-    replicaFetchThread.start()
-
-    Thread.sleep(700)
-    replicaFetchThread.shutdown()
-
-    assertEquals(60L, testReplica.log.get.logEndOffset)
-    replicaLog.close()
+    producer.send(new ProducerData[String, String](topic1, testMessageList1),
+                  new ProducerData[String, String](topic2, testMessageList2))
+    producer.close()
+
+    def condition(): Boolean = {
+      var result = true
+      for (topic <- List(topic1, topic2)) {
+        val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
+        result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
+          (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
+      }
+      result
+    }
+    assertTrue("broker logs should be identical", waitUntilTrue(condition, 6000))
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat Jun 23 07:14:24 2012
@@ -397,6 +397,18 @@ object TestUtils extends Logging {
     }
   }
 
+  def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
+    val startTime = System.currentTimeMillis()
+    while (true) {
+      if (condition())
+        return true
+      if (System.currentTimeMillis() > startTime + waitTime)
+        return false
+      Thread.sleep(100)
+    }
+    // should never hit here
+    throw new RuntimeException("unexpected error")
+  }
 }
 
 object ControllerTestUtils{
@@ -442,9 +454,6 @@ object ControllerTestUtils{
   }
 }
 
-
-
-
 object TestZKUtils {
   val zookeeperConnect = "127.0.0.1:2182"  
 }