You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/23 09:14:25 UTC
svn commit: r1353086 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/log/
main/scala/kafka/server/ test/scala/unit/kafka/server/
test/scala/unit/kafka/utils/
Author: junrao
Date: Sat Jun 23 07:14:24 2012
New Revision: 1353086
URL: http://svn.apache.org/viewvc?rev=1353086&view=rev
Log:
using MultiFetch in the follower; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; KAFKA-339
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Sat Jun 23 07:14:24 2012
@@ -18,7 +18,6 @@
package kafka.cluster
import kafka.log.Log
-import kafka.server.{KafkaConfig, ReplicaFetcherThread}
import java.lang.IllegalStateException
import kafka.utils.Logging
@@ -28,7 +27,6 @@ class Replica(val brokerId: Int,
var log: Option[Log] = None,
var leoUpdateTime: Long = -1L) extends Logging {
private var logEndOffset: Long = -1L
- private var replicaFetcherThread: ReplicaFetcherThread = null
def logEndOffset(newLeo: Option[Long] = None): Long = {
isLocal match {
@@ -88,32 +86,6 @@ class Replica(val brokerId: Int,
}
}
- def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
- val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
- replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
- replicaFetcherThread.setDaemon(true)
- replicaFetcherThread.start()
- }
-
- def stopReplicaFetcherThread() {
- if(replicaFetcherThread != null) {
- replicaFetcherThread.shutdown()
- replicaFetcherThread = null
- }
- }
-
- def getIfFollowerAndLeader(): (Boolean, Int) = {
- replicaFetcherThread != null match {
- case true => (true, replicaFetcherThread.getLeader().id)
- case false => (false, -1)
- }
- }
-
- def close() {
- if(replicaFetcherThread != null)
- replicaFetcherThread.shutdown()
- }
-
override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Replica]))
return false
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Sat Jun 23 07:14:24 2012
@@ -60,6 +60,8 @@ object ErrorMapping {
def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()
+
+ def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
}
class InvalidTopicException(message: String) extends RuntimeException(message) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Sat Jun 23 07:14:24 2012
@@ -405,6 +405,19 @@ private[kafka] class Log(val dir: File,
ret
}
+ /**
+ * Truncate all segments in the log and start a new segment on a new offset
+ */
+ def truncateAndStartWithNewOffset(newOffset: Long) {
+ lock synchronized {
+ val deletedSegments = segments.trunc(segments.view.size)
+ val newFile = new File(dir, Log.nameFromOffset(newOffset))
+ debug("tuncate and start log '" + name + "' to " + newFile.getName())
+ segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
+ deleteSegments(deletedSegments)
+ }
+ }
+
/* Attemps to delete all provided segments from a log and returns how many it was able to */
def deleteSegments(segments: Seq[LogSegment]): Int = {
var total = 0
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection.mutable
+import kafka.utils.Logging
+import kafka.cluster.Broker
+
+abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging {
+ // map of (source brokerid, fetcher Id per source broker) => fetcher
+ private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
+ private val mapLock = new Object
+ this.logIdent = name + " "
+
+ private def getFetcherId(topic: String, partitionId: Int) : Int = {
+ (topic.hashCode() + 31 * partitionId) % numReplicaFetchers
+ }
+
+ // to be defined in subclass to create a specific fetcher
+ def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread
+
+ def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
+ mapLock synchronized {
+ var fetcherThread: AbstractFetcherThread = null
+ val key = (sourceBroker.id, getFetcherId(topic, partitionId))
+ fetcherThreadMap.get(key) match {
+ case Some(f) => fetcherThread = f
+ case None =>
+ fetcherThread = createFetcherThread(key._2, sourceBroker)
+ fetcherThreadMap.put(key, fetcherThread)
+ fetcherThread.start
+ }
+ fetcherThread.addPartition(topic, partitionId, initialOffset)
+ info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
+ .format(topic, partitionId, initialOffset, sourceBroker.id, key._2))
+ }
+ }
+
+ def removeFetcher(topic: String, partitionId: Int) {
+ info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
+ mapLock synchronized {
+ for ((key, fetcher) <- fetcherThreadMap) {
+ fetcher.removePartition(topic, partitionId)
+ if (fetcher.partitionCount <= 0) {
+ fetcher.shutdown
+ fetcherThreadMap.remove(key)
+ }
+ }
+ }
+ }
+
+ def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
+ mapLock synchronized {
+ for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
+ if (fetcher.hasPartition(topic, partitionId))
+ return Some(sourceBrokerId)
+ }
+ None
+ }
+
+ def shutdown() = {
+ info("shutting down")
+ mapLock synchronized {
+ for ( (_, fetcher) <- fetcherThreadMap) {
+ fetcher.shutdown
+ }
+ }
+ info("shutdown completes")
+ }
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.CountDownLatch
+import kafka.cluster.Broker
+import kafka.consumer.SimpleConsumer
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.utils.Logging
+import kafka.common.ErrorMapping
+import kafka.api.{PartitionData, FetchRequestBuilder}
+import scala.collection.mutable
+import kafka.message.ByteBufferMessageSet
+
+/**
+ * Abstract class for fetching data from multiple partitions from the same broker.
+ */
+abstract class AbstractFetcherThread(val name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+ fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
+ extends Thread(name) with Logging {
+ private val isRunning: AtomicBoolean = new AtomicBoolean(true)
+ private val shutdownLatch = new CountDownLatch(1)
+ private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
+ private val fetchMapLock = new Object
+ val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
+ this.logIdent = name + " "
+ info("starting")
+ // callbacks to be defined in subclass
+
+ // process fetched data and return the new fetch offset
+ def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)
+
+ // handle a partition whose offset is out of range and return a new fetch offset
+ def handleOffsetOutOfRange(topic: String, partitionId: Int): Long
+
+ // any logic for partitions whose leader has changed
+ def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit
+
+ override def run() {
+ try {
+ while(isRunning.get()) {
+ val builder = new FetchRequestBuilder().
+ clientId(name).
+ replicaId(fetcherBrokerId).
+ maxWait(maxWait).
+ minBytes(minBytes)
+
+ fetchMapLock synchronized {
+ for ( ((topic, partitionId), offset) <- fetchMap )
+ builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
+ }
+
+ val fetchRequest = builder.build()
+ val response = simpleConsumer.fetch(fetchRequest)
+ trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
+
+ var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil
+ // process fetched data
+ fetchMapLock synchronized {
+ for ( topicData <- response.data ) {
+ for ( partitionData <- topicData.partitionData) {
+ val topic = topicData.topic
+ val partitionId = partitionData.partition
+ val key = (topic, partitionId)
+ val currentOffset = fetchMap.get(key)
+ if (currentOffset.isDefined) {
+ partitionData.error match {
+ case ErrorMapping.NoError =>
+ processPartitionData(topic, currentOffset.get, partitionData)
+ val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+ fetchMap.put(key, newOffset)
+ case ErrorMapping.OffsetOutOfRangeCode =>
+ val newOffset = handleOffsetOutOfRange(topic, partitionId)
+ fetchMap.put(key, newOffset)
+ warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+ .format(currentOffset.get, topic, partitionId, newOffset))
+ case ErrorMapping.NotLeaderForPartitionCode =>
+ partitionsWithNewLeader ::= key
+ case _ =>
+ error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
+ ErrorMapping.exceptionFor(partitionData.error))
+ }
+ }
+ }
+ }
+ }
+ if (partitionsWithNewLeader.size > 0) {
+ debug("changing leaders for %s".format(partitionsWithNewLeader))
+ handlePartitionsWithNewLeader(partitionsWithNewLeader)
+ }
+ }
+ } catch {
+ case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down")
+ case e1 => error("error in replica fetcher runnable", e1)
+ }
+ shutdownComplete()
+ }
+
+ def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
+ fetchMapLock synchronized {
+ fetchMap.put((topic, partitionId), initialOffset)
+ }
+ }
+
+ def removePartition(topic: String, partitionId: Int) {
+ fetchMapLock synchronized {
+ fetchMap.remove((topic, partitionId))
+ }
+ }
+
+ def hasPartition(topic: String, partitionId: Int): Boolean = {
+ fetchMapLock synchronized {
+ fetchMap.get((topic, partitionId)).isDefined
+ }
+ }
+
+ def partitionCount() = {
+ fetchMapLock synchronized {
+ fetchMap.size
+ }
+ }
+
+ private def shutdownComplete() = {
+ simpleConsumer.close()
+ shutdownLatch.countDown
+ }
+
+ def shutdown() {
+ isRunning.set(false)
+ interrupt()
+ shutdownLatch.await()
+ info("shutdown completed")
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Sat Jun 23 07:14:24 2012
@@ -136,4 +136,7 @@ class KafkaConfig(props: Properties) ext
val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)
+ /* number of fetcher threads used to replicate messages from a source broker.
+ * Increasing this value can increase the degree of I/O parallelism in the follower broker. */
+ val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1353086&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Sat Jun 23 07:14:24 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.Broker
+
+class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
+ extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
+
+ def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
+ new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
+ }
+
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Sat Jun 23 07:14:24 2012
@@ -17,67 +17,41 @@
package kafka.server
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
-import kafka.api.FetchRequestBuilder
-import kafka.utils.Logging
-import kafka.cluster.{Broker, Replica}
-import kafka.consumer.SimpleConsumer
-
-class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig)
- extends Thread(name) with Logging {
- val isRunning: AtomicBoolean = new AtomicBoolean(true)
- private val shutdownLatch = new CountDownLatch(1)
- private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port,
- config.replicaSocketTimeoutMs, config.replicaSocketBufferSize)
-
- override def run() {
- try {
- info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId))
- while(isRunning.get()) {
- val builder = new FetchRequestBuilder().
- clientId(name).
- replicaId(replica.brokerId).
- maxWait(config.replicaMaxWaitTimeMs).
- minBytes(config.replicaMinBytes)
-
- // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message
- // replication actually works
- val fetchOffset = replica.logEndOffset()
- trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d"
- .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset))
- builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize)
-
- val fetchRequest = builder.build()
- val response = replicaConsumer.fetch(fetchRequest)
- // TODO: KAFKA-339 Check for error. Don't blindly read the messages
- // append messages to local log
- replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId))
- // record the hw sent by the leader for this partition
- val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw)
- replica.highWatermark(Some(followerHighWatermark))
- trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
- .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark()))
- }
- }catch {
- case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name))
- case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1)
- }
- shutdownComplete()
+import kafka.api.{OffsetRequest, PartitionData}
+import kafka.cluster.Broker
+import kafka.message.ByteBufferMessageSet
+
+class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
+ extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+ socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
+ fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
+ minBytes = brokerConfig.replicaMinBytes) {
+
+ // process fetched data and return the new fetch offset
+ def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = {
+ val partitionId = partitionData.partition
+ val replica = replicaMgr.getReplica(topic, partitionId).get
+ val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+
+ if (fetchOffset != replica.logEndOffset())
+ throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+ replica.log.get.append(messageSet)
+ replica.highWatermark(Some(partitionData.hw))
+ trace("follower %d set replica highwatermark for topic %s partition %d to %d"
+ .format(replica.brokerId, topic, partitionId, partitionData.hw))
}
- private def shutdownComplete() = {
- replicaConsumer.close()
- shutdownLatch.countDown
+ // handle a partition whose offset is out of range and return a new fetch offset
+ def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
+ // This means the local replica is out of date. Truncate the log and catch up from beginning.
+ val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1)
+ val replica = replicaMgr.getReplica(topic, partitionId).get
+ replica.log.get.truncateAndStartWithNewOffset(offsets(0))
+ return offsets(0)
}
- def getLeader(): Broker = leaderBroker
-
- def shutdown() {
- info("Shutting down replica fetcher thread")
- isRunning.set(false)
- interrupt()
- shutdownLatch.await()
- info("Replica fetcher thread shutdown completed")
+ // any logic for partitions whose leader has changed
+ def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = {
+ // no handler needed since the controller will make the changes accordingly
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Sat Jun 23 07:14:24 2012
@@ -32,6 +32,8 @@ class ReplicaManager(val config: KafkaCo
private var leaderReplicas = new ListBuffer[Partition]()
private val leaderReplicaLock = new ReentrantLock()
private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
+ private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+
// start ISR expiration thread
isrExpirationScheduler.startUp
isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
@@ -139,7 +141,7 @@ class ReplicaManager(val config: KafkaCo
def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
// stop replica fetcher thread, if any
- replica.stopReplicaFetcherThread()
+ replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
replica.partition.updateISR(currentISRInZk.toSet)
@@ -153,7 +155,7 @@ class ReplicaManager(val config: KafkaCo
}
def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
- info("Broker %d becoming follower to leader %d for topic %s partition %d"
+ info("broker %d intending to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// remove this replica's partition from the ISR expiration queue
try {
@@ -169,13 +171,15 @@ class ReplicaManager(val config: KafkaCo
}
// get leader for this replica
val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
- val isReplicaAFollower = replica.getIfFollowerAndLeader()
+ val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
// Become follower only if it is not already following the same leader
- if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) {
+ if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
+ info("broker %d becoming follower to leader %d for topic %s partition %d"
+ .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
// stop fetcher thread to previous leader
- replica.stopReplicaFetcherThread()
+ replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// start fetcher thread to current leader
- replica.startReplicaFetcherThread(leaderBroker, config)
+ replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
}
}
@@ -244,6 +248,6 @@ class ReplicaManager(val config: KafkaCo
def close() {
isrExpirationScheduler.shutdown()
- allReplicas.foreach(_._2.assignedReplicas().foreach(_.close()))
+ replicaFetcherManager.shutdown()
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Sat Jun 23 07:14:24 2012
@@ -23,17 +23,15 @@ import kafka.utils.TestUtils._
import kafka.producer.ProducerData
import kafka.serializer.StringEncoder
import kafka.admin.CreateTopicCommand
-import kafka.cluster.{Replica, Partition, Broker}
-import kafka.utils.{MockTime, TestUtils}
+import kafka.utils.TestUtils
import junit.framework.Assert._
-import java.io.File
-import kafka.log.Log
class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(2)
val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1})
var brokers: Seq[KafkaServer] = null
- val topic = "foobar"
+ val topic1 = "foo"
+ val topic2 = "bar"
override def setUp() {
super.setUp()
@@ -41,45 +39,36 @@ class ReplicaFetchTest extends JUnit3Sui
}
override def tearDown() {
- super.tearDown()
brokers.foreach(_.shutdown())
+ super.tearDown()
}
def testReplicaFetcherThread() {
val partition = 0
- val testMessageList = List("test1", "test2", "test3", "test4")
- val leaderBrokerId = configs.head.brokerId
- val followerBrokerId = configs.last.brokerId
- val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
+ val testMessageList1 = List("test1", "test2", "test3", "test4")
+ val testMessageList2 = List("test5", "test6", "test7", "test8")
// create a topic and partition and await leadership
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
- TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+ for (topic <- List(topic1,topic2)) {
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+ }
// send test messages to leader
val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
- producer.send(new ProducerData[String, String](topic, "test", testMessageList))
-
- // create a tmp directory
- val tmpLogDir = TestUtils.tempDir()
- val replicaLogDir = new File(tmpLogDir, topic + "-" + partition)
- replicaLogDir.mkdirs()
- val replicaLog = new Log(replicaLogDir, 500, 500, false)
-
- // create replica fetch thread
- val time = new MockTime
- val testPartition = new Partition(topic, partition, time)
- testPartition.leaderId(Some(leaderBrokerId))
- val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog))
- val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last)
-
- // start a replica fetch thread to the above broker
- replicaFetchThread.start()
-
- Thread.sleep(700)
- replicaFetchThread.shutdown()
-
- assertEquals(60L, testReplica.log.get.logEndOffset)
- replicaLog.close()
+ producer.send(new ProducerData[String, String](topic1, testMessageList1),
+ new ProducerData[String, String](topic2, testMessageList2))
+ producer.close()
+
+ def condition(): Boolean = {
+ var result = true
+ for (topic <- List(topic1, topic2)) {
+ val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset
+ result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total &&
+ (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) }
+ }
+ result
+ }
+ assertTrue("broker logs should be identical", waitUntilTrue(condition, 6000))
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1353086&r1=1353085&r2=1353086&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Sat Jun 23 07:14:24 2012
@@ -397,6 +397,18 @@ object TestUtils extends Logging {
}
}
+ def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
+ val startTime = System.currentTimeMillis()
+ while (true) {
+ if (condition())
+ return true
+ if (System.currentTimeMillis() > startTime + waitTime)
+ return false
+ Thread.sleep(100)
+ }
+ // should never hit here
+ throw new RuntimeException("unexpected error")
+ }
}
object ControllerTestUtils{
@@ -442,9 +454,6 @@ object ControllerTestUtils{
}
}
-
-
-
object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}