You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/23 02:22:40 UTC
git commit: kafka-1410; MetadataCache cleanup; patched by Jun Rao;
reviewed by Timothy Chen, Joel Koshy
Repository: kafka
Updated Branches:
refs/heads/trunk ed68ba402 -> caafc9d61
kafka-1410; MetadataCache cleanup; patched by Jun Rao; reviewed by Timothy Chen, Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/caafc9d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/caafc9d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/caafc9d6
Branch: refs/heads/trunk
Commit: caafc9d614dcb8584fbaf01902fbeaf2d5ea6786
Parents: ed68ba4
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 22 17:22:27 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 22 17:22:27 2014 -0700
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 186 +++----------------
.../main/scala/kafka/server/MetadataCache.scala | 151 +++++++++++++++
.../scala/kafka/server/ReplicaManager.scala | 20 +-
.../unit/kafka/server/SimpleFetchTest.scala | 4 +-
4 files changed, 194 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a4ffce..bb0359d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,12 +29,8 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.common._
import kafka.utils.{Pool, SystemTime, Logging}
import kafka.network.RequestChannel.Response
-import kafka.cluster.Broker
import kafka.controller.KafkaController
-import kafka.utils.Utils.inLock
import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.controller.KafkaController.StateChangeLogger
/**
* Logic to handle the various Kafka requests
@@ -52,80 +48,9 @@ class KafkaApis(val requestChannel: RequestChannel,
private val fetchRequestPurgatory =
new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
private val delayedRequestMetrics = new DelayedRequestMetrics
- /* following 3 data structures are updated by the update metadata request
- * and is queried by the topic metadata request. */
var metadataCache = new MetadataCache
- private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
- private val partitionMetadataLock = new ReentrantReadWriteLock()
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
- class MetadataCache {
- private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
- new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
-
- def addPartitionInfo(topic: String,
- partitionId: Int,
- stateInfo: PartitionStateInfo) {
- cache.get(topic) match {
- case Some(infos) => infos.put(partitionId, stateInfo)
- case None => {
- val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
- cache.put(topic, newInfos)
- newInfos.put(partitionId, stateInfo)
- }
- }
- }
-
- def removePartitionInfo(topic: String, partitionId: Int) = {
- cache.get(topic) match {
- case Some(infos) => {
- infos.remove(partitionId)
- if(infos.isEmpty) {
- cache.remove(topic)
- }
- true
- }
- case None => false
- }
- }
-
- def getPartitionInfos(topic: String) = cache(topic)
-
- def containsTopicAndPartition(topic: String,
- partitionId: Int): Boolean = {
- cache.get(topic) match {
- case Some(partitionInfos) => partitionInfos.contains(partitionId)
- case None => false
- }
- }
-
- def allTopics = cache.keySet
-
- def removeTopic(topic: String) = cache.remove(topic)
-
- def containsTopic(topic: String) = cache.contains(topic)
-
- def updateCache(updateMetadataRequest: UpdateMetadataRequest,
- brokerId: Int,
- stateChangeLogger: StateChangeLogger) = {
- updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
- if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
- removePartitionInfo(tp.topic, tp.partition)
- stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
- "sent by controller %d epoch %d with correlation id %d")
- .format(brokerId, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
- } else {
- addPartitionInfo(tp.topic, tp.partition, info)
- stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
- "sent by controller %d epoch %d with correlation id %d")
- .format(brokerId, info, tp, updateMetadataRequest.controllerId,
- updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
- }
- }
- }
- }
-
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
@@ -154,14 +79,6 @@ class KafkaApis(val requestChannel: RequestChannel,
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
- // ensureTopicExists is only for client facing requests
- private def ensureTopicExists(topic: String) = {
- inLock(partitionMetadataLock.readLock()) {
- if (!metadataCache.containsTopic(topic))
- throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
- }
- }
-
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
@@ -191,24 +108,8 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
- // ensureTopicExists is only for client facing requests
- // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
- // stop serving data to clients for the topic being deleted
- val stateChangeLogger = replicaManager.stateChangeLogger
- if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
- val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
- "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
- updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
- replicaManager.controllerEpoch)
- stateChangeLogger.warn(stateControllerEpochErrorMessage)
- throw new ControllerMovedException(stateControllerEpochErrorMessage)
- }
- inLock(partitionMetadataLock.writeLock()) {
- replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
- // cache the list of alive brokers in the cluster
- updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
- metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger)
- }
+ replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
+
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
}
@@ -388,7 +289,6 @@ class KafkaApis(val requestChannel: RequestChannel,
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
try {
- ensureTopicExists(topicAndPartition.topic)
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val info =
partitionOpt match {
@@ -491,7 +391,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
val partitionData =
try {
- ensureTopicExists(topic)
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
@@ -562,7 +461,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val responseMap = offsetRequest.requestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
try {
- ensureTopicExists(topicAndPartition.topic)
// ensure leader exists
val localReplica = if(!offsetRequest.isFromDebuggingClient)
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -658,69 +556,33 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
- val config = replicaManager.config
-
- // Returning all topics when requested topics are empty
- val isAllTopics = topics.isEmpty
- val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
- val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String]
-
- inLock(partitionMetadataLock.readLock()) {
- val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics
- for (topic <- topicsRequested) {
- if (isAllTopics || metadataCache.containsTopic(topic)) {
- val partitionStateInfos = metadataCache.getPartitionInfos(topic)
- val partitionMetadata = partitionStateInfos.map {
- case (partitionId, partitionState) =>
- val replicas = partitionState.allReplicas
- val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
- var leaderInfo: Option[Broker] = None
- var isrInfo: Seq[Broker] = Nil
- val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
- val leader = leaderIsrAndEpoch.leaderAndIsr.leader
- val isr = leaderIsrAndEpoch.leaderAndIsr.isr
- debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
- try {
- leaderInfo = aliveBrokers.get(leader)
- if (!leaderInfo.isDefined)
- throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId))
- isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
- if (replicaInfo.size < replicas.size)
- throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
- replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
- if (isrInfo.size < isr.size)
- throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
- isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
- new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
- } catch {
- case e: Throwable =>
- debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage))
- new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
- ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }
+ val topicResponses = metadataCache.getTopicMetadata(topics)
+ if (topics.size > 0 && topicResponses.size != topics.size) {
+ val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
+ val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+ if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
+ try {
+ if (topic == OffsetManager.OffsetsTopicName) {
+ AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+ offsetManager.offsetsTopicConfig)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+ .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
+ }
+ else {
+ AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+ .format(topic, config.numPartitions, config.defaultReplicationFactor))
+ }
+ } catch {
+ case e: TopicExistsException => // let it go, possibly another broker created this topic
}
- topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
- } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) {
- topicsToBeCreated += topic
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
} else {
- topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+ new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
}
}
+ topicResponses.appendAll(responsesForNonExistentTopics)
}
-
- topicResponses.appendAll(topicsToBeCreated.map { topic =>
- try {
- if (topic == OffsetManager.OffsetsTopicName)
- AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig)
- else
- AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
- info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
- } catch {
- case e: TopicExistsException => // let it go, possibly another broker created this topic
- }
- new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
- })
-
topicResponses
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
new file mode 100644
index 0000000..a8b7bf7
--- /dev/null
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection.{Seq, Set, mutable}
+import kafka.api._
+import kafka.cluster.Broker
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.utils.Utils._
+import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
+import kafka.common.TopicAndPartition
+import kafka.controller.KafkaController.StateChangeLogger
+import scala.Some
+
+/**
+ * A cache for the state (e.g., current leader) of each partition. This cache is updated through
+ * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
+ */
+private[server] class MetadataCache {
+ private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
+ new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
+ private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+ private val partitionMetadataLock = new ReentrantReadWriteLock()
+
+ def getTopicMetadata(topics: Set[String]) = {
+ val isAllTopics = topics.isEmpty
+ val topicsRequested = if(isAllTopics) cache.keySet else topics
+ val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+ inLock(partitionMetadataLock.readLock()) {
+ for (topic <- topicsRequested) {
+ if (isAllTopics || cache.contains(topic)) {
+ val partitionStateInfos = cache(topic)
+ val partitionMetadata = partitionStateInfos.map {
+ case (partitionId, partitionState) =>
+ val replicas = partitionState.allReplicas
+ val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
+ var leaderInfo: Option[Broker] = None
+ var isrInfo: Seq[Broker] = Nil
+ val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+ val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+ val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+ val topicPartition = TopicAndPartition(topic, partitionId)
+ try {
+ leaderInfo = aliveBrokers.get(leader)
+ if (!leaderInfo.isDefined)
+ throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition))
+ isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+ if (replicaInfo.size < replicas.size)
+ throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+ replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+ if (isrInfo.size < isr.size)
+ throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+ isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+ new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+ } catch {
+ case e: Throwable =>
+ debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage))
+ new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
+ ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ }
+ topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+ }
+ }
+ }
+ topicResponses
+ }
+
+ def addOrUpdatePartitionInfo(topic: String,
+ partitionId: Int,
+ stateInfo: PartitionStateInfo) {
+ inLock(partitionMetadataLock.writeLock()) {
+ cache.get(topic) match {
+ case Some(infos) => infos.put(partitionId, stateInfo)
+ case None => {
+ val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
+ cache.put(topic, newInfos)
+ newInfos.put(partitionId, stateInfo)
+ }
+ }
+ }
+ }
+
+ def getPartitionInfos(topic: String) = {
+ inLock(partitionMetadataLock.readLock()) {
+ cache(topic)
+ }
+ }
+
+ def containsTopicAndPartition(topic: String,
+ partitionId: Int): Boolean = {
+ inLock(partitionMetadataLock.readLock()) {
+ cache.get(topic) match {
+ case Some(partitionInfos) => partitionInfos.contains(partitionId)
+ case None => false
+ }
+ }
+ }
+
+ def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+ brokerId: Int,
+ stateChangeLogger: StateChangeLogger) {
+ inLock(partitionMetadataLock.writeLock()) {
+ updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+ updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
+ if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
+ removePartitionInfo(tp.topic, tp.partition)
+ stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
+ "sent by controller %d epoch %d with correlation id %d")
+ .format(brokerId, tp, updateMetadataRequest.controllerId,
+ updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+ } else {
+ addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+ stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
+ "sent by controller %d epoch %d with correlation id %d")
+ .format(brokerId, info, tp, updateMetadataRequest.controllerId,
+ updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+ }
+ }
+ }
+ }
+
+ private def removePartitionInfo(topic: String, partitionId: Int) = {
+ cache.get(topic) match {
+ case Some(infos) => {
+ infos.remove(partitionId)
+ if(infos.isEmpty) {
+ cache.remove(topic)
+ }
+ true
+ }
+ case None => false
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5588f59..11c20ce 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -23,16 +23,14 @@ import kafka.utils._
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
import kafka.common._
-import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.KafkaController
-import org.apache.log4j.Logger
import org.I0Itec.zkclient.ZkClient
import com.yammer.metrics.core.Gauge
import java.util.concurrent.atomic.AtomicBoolean
import java.io.{IOException, File}
import java.util.concurrent.TimeUnit
-
object ReplicaManager {
val UnknownLogEndOffset = -1L
val HighWatermarkFilename = "replication-offset-checkpoint"
@@ -205,6 +203,22 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
+ replicaStateChangeLock synchronized {
+ if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
+ val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
+ "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
+ updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
+ controllerEpoch)
+ stateChangeLogger.warn(stateControllerEpochErrorMessage)
+ throw new ControllerMovedException(stateControllerEpochErrorMessage)
+ } else {
+ metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
+ controllerEpoch = updateMetadataRequest.controllerEpoch
+ }
+ }
+ }
+
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 17b08e1..b1c4ce9 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite {
val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
- apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
+ apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
EasyMock.replay(partitionStateInfo)
// This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
val goodFetch = new FetchRequestBuilder()
@@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite {
val requestChannel = new RequestChannel(2, 5)
val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
- apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
+ apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
EasyMock.replay(partitionStateInfo)
/**