You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/23 02:22:40 UTC

git commit: kafka-1410; MetadataCache cleanup; patched by Jun Rao; reviewed by Timothy Chen, Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/trunk ed68ba402 -> caafc9d61


kafka-1410; MetadataCache cleanup; patched by Jun Rao; reviewed by Timothy Chen, Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/caafc9d6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/caafc9d6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/caafc9d6

Branch: refs/heads/trunk
Commit: caafc9d614dcb8584fbaf01902fbeaf2d5ea6786
Parents: ed68ba4
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 22 17:22:27 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 22 17:22:27 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/server/KafkaApis.scala | 186 +++----------------
 .../main/scala/kafka/server/MetadataCache.scala | 151 +++++++++++++++
 .../scala/kafka/server/ReplicaManager.scala     |  20 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   4 +-
 4 files changed, 194 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1a4ffce..bb0359d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,12 +29,8 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.common._
 import kafka.utils.{Pool, SystemTime, Logging}
 import kafka.network.RequestChannel.Response
-import kafka.cluster.Broker
 import kafka.controller.KafkaController
-import kafka.utils.Utils.inLock
 import org.I0Itec.zkclient.ZkClient
-import java.util.concurrent.locks.ReentrantReadWriteLock
-import kafka.controller.KafkaController.StateChangeLogger
 
 /**
  * Logic to handle the various Kafka requests
@@ -52,80 +48,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val fetchRequestPurgatory =
     new FetchRequestPurgatory(requestChannel, replicaManager.config.fetchPurgatoryPurgeIntervalRequests)
   private val delayedRequestMetrics = new DelayedRequestMetrics
-  /* following 3 data structures are updated by the update metadata request
-  * and is queried by the topic metadata request. */
   var metadataCache = new MetadataCache
-  private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
-  private val partitionMetadataLock = new ReentrantReadWriteLock()
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
 
-  class MetadataCache {
-    private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
-      new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
-
-    def addPartitionInfo(topic: String,
-                         partitionId: Int,
-                         stateInfo: PartitionStateInfo) {
-      cache.get(topic) match {
-        case Some(infos) => infos.put(partitionId, stateInfo)
-        case None => {
-          val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
-          cache.put(topic, newInfos)
-          newInfos.put(partitionId, stateInfo)
-        }
-      }
-    }
-
-    def removePartitionInfo(topic: String, partitionId: Int) = {
-      cache.get(topic) match {
-        case Some(infos) => {
-          infos.remove(partitionId)
-          if(infos.isEmpty) {
-            cache.remove(topic)
-          }
-          true
-        }
-        case None => false
-      }
-    }
-
-    def getPartitionInfos(topic: String) = cache(topic)
-
-    def containsTopicAndPartition(topic: String,
-                                  partitionId: Int): Boolean = {
-      cache.get(topic) match {
-        case Some(partitionInfos) => partitionInfos.contains(partitionId)
-        case None => false
-      }
-    }
-
-    def allTopics = cache.keySet
-
-    def removeTopic(topic: String) = cache.remove(topic)
-
-    def containsTopic(topic: String) = cache.contains(topic)
-
-    def updateCache(updateMetadataRequest: UpdateMetadataRequest,
-                    brokerId: Int,
-                    stateChangeLogger: StateChangeLogger) = {
-      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
-        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
-	        removePartitionInfo(tp.topic, tp.partition)
-          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
-                                   "sent by controller %d epoch %d with correlation id %d")
-                                   .format(brokerId, tp, updateMetadataRequest.controllerId,
-                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-        } else {
-	        addPartitionInfo(tp.topic, tp.partition, info)
-          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
-                                   "sent by controller %d epoch %d with correlation id %d")
-                                   .format(brokerId, info, tp, updateMetadataRequest.controllerId,
-                                           updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
-        }
-      }
-    }
-  }
-
   /**
    * Top-level method that handles all requests and multiplexes to the right api
    */
@@ -154,14 +79,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
-  // ensureTopicExists is only for client facing requests
-  private def ensureTopicExists(topic: String) = {
-    inLock(partitionMetadataLock.readLock()) {
-      if (!metadataCache.containsTopic(topic))
-        throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
-    }
-  }
-
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     // ensureTopicExists is only for client facing requests
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
@@ -191,24 +108,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
-    // ensureTopicExists is only for client facing requests
-    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
-    // stop serving data to clients for the topic being deleted
-    val stateChangeLogger = replicaManager.stateChangeLogger
-    if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
-      val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
-        "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
-        updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
-        replicaManager.controllerEpoch)
-      stateChangeLogger.warn(stateControllerEpochErrorMessage)
-      throw new ControllerMovedException(stateControllerEpochErrorMessage)
-    }
-    inLock(partitionMetadataLock.writeLock()) {
-      replicaManager.controllerEpoch = updateMetadataRequest.controllerEpoch
-      // cache the list of alive brokers in the cluster
-      updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
-      metadataCache.updateCache(updateMetadataRequest, brokerId, stateChangeLogger)
-    }
+    replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)
+
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
   }
@@ -388,7 +289,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
-        ensureTopicExists(topicAndPartition.topic)
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val info =
           partitionOpt match {
@@ -491,7 +391,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
         val partitionData =
           try {
-            ensureTopicExists(topic)
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
@@ -562,7 +461,6 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseMap = offsetRequest.requestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
       try {
-        ensureTopicExists(topicAndPartition.topic)
         // ensure leader exists
         val localReplica = if(!offsetRequest.isFromDebuggingClient)
           replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -658,69 +556,33 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def getTopicMetadata(topics: Set[String]): Seq[TopicMetadata] = {
-    val config = replicaManager.config
-
-    // Returning all topics when requested topics are empty
-    val isAllTopics = topics.isEmpty
-    val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
-    val topicsToBeCreated: mutable.ListBuffer[String] = new mutable.ListBuffer[String]
-
-    inLock(partitionMetadataLock.readLock()) {
-      val topicsRequested = if (isAllTopics) metadataCache.allTopics else topics
-      for (topic <- topicsRequested) {
-        if (isAllTopics || metadataCache.containsTopic(topic)) {
-          val partitionStateInfos = metadataCache.getPartitionInfos(topic)
-          val partitionMetadata = partitionStateInfos.map {
-            case (partitionId, partitionState) =>
-              val replicas = partitionState.allReplicas
-              val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
-              var leaderInfo: Option[Broker] = None
-              var isrInfo: Seq[Broker] = Nil
-              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
-              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
-              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
-              debug("topic %s partition %s".format(topic, partitionId) + ";replicas = " + replicas + ", in sync replicas = " + isr + ", leader = " + leader)
-              try {
-                leaderInfo = aliveBrokers.get(leader)
-                if (!leaderInfo.isDefined)
-                  throw new LeaderNotAvailableException("Leader not available for topic %s partition %s".format(topic, partitionId))
-                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
-                if (replicaInfo.size < replicas.size)
-                  throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
-                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
-                if (isrInfo.size < isr.size)
-                  throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
-                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
-                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
-              } catch {
-                case e: Throwable =>
-                  debug("Error while fetching metadata for topic %s partition %s. Possible cause: %s".format(topic, partitionId, e.getMessage))
-                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
-                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-              }
+    val topicResponses = metadataCache.getTopicMetadata(topics)
+    if (topics.size > 0 && topicResponses.size != topics.size) {
+      val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
+      val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
+        if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
+          try {
+            if (topic == OffsetManager.OffsetsTopicName) {
+              AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor,
+                                     offsetManager.offsetsTopicConfig)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                .format(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor))
+            }
+            else {
+              AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+              info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+                   .format(topic, config.numPartitions, config.defaultReplicationFactor))
+            }
+          } catch {
+            case e: TopicExistsException => // let it go, possibly another broker created this topic
           }
-          topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
-        } else if (config.autoCreateTopicsEnable || topic == OffsetManager.OffsetsTopicName) {
-          topicsToBeCreated += topic
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
         } else {
-          topicResponses += new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
+          new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
         }
       }
+      topicResponses.appendAll(responsesForNonExistentTopics)
     }
-
-    topicResponses.appendAll(topicsToBeCreated.map { topic =>
-      try {
-        if (topic == OffsetManager.OffsetsTopicName)
-          AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor, offsetManager.offsetsTopicConfig)
-        else
-          AdminUtils.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
-          info("Auto creation of topic %s with %d partitions and replication factor %d is successful!".format(topic, config.numPartitions, config.defaultReplicationFactor))
-      } catch {
-        case e: TopicExistsException => // let it go, possibly another broker created this topic
-      }
-      new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.LeaderNotAvailableCode)
-    })
-
     topicResponses
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
new file mode 100644
index 0000000..a8b7bf7
--- /dev/null
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import scala.collection.{Seq, Set, mutable}
+import kafka.api._
+import kafka.cluster.Broker
+import java.util.concurrent.locks.ReentrantReadWriteLock
+import kafka.utils.Utils._
+import kafka.common.{ErrorMapping, ReplicaNotAvailableException, LeaderNotAvailableException}
+import kafka.common.TopicAndPartition
+import kafka.controller.KafkaController.StateChangeLogger
+import scala.Some
+
+/**
+ *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
+ *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
+ */
+private[server] class MetadataCache {
+  private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
+    new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
+  private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
+  private val partitionMetadataLock = new ReentrantReadWriteLock()
+
+  def getTopicMetadata(topics: Set[String]) = {
+    val isAllTopics = topics.isEmpty
+    val topicsRequested = if(isAllTopics) cache.keySet else topics
+    val topicResponses: mutable.ListBuffer[TopicMetadata] = new mutable.ListBuffer[TopicMetadata]
+    inLock(partitionMetadataLock.readLock()) {
+      for (topic <- topicsRequested) {
+        if (isAllTopics || cache.contains(topic)) {
+          val partitionStateInfos = cache(topic)
+          val partitionMetadata = partitionStateInfos.map {
+            case (partitionId, partitionState) =>
+              val replicas = partitionState.allReplicas
+              val replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
+              var leaderInfo: Option[Broker] = None
+              var isrInfo: Seq[Broker] = Nil
+              val leaderIsrAndEpoch = partitionState.leaderIsrAndControllerEpoch
+              val leader = leaderIsrAndEpoch.leaderAndIsr.leader
+              val isr = leaderIsrAndEpoch.leaderAndIsr.isr
+              val topicPartition = TopicAndPartition(topic, partitionId)
+              try {
+                leaderInfo = aliveBrokers.get(leader)
+                if (!leaderInfo.isDefined)
+                  throw new LeaderNotAvailableException("Leader not available for %s".format(topicPartition))
+                isrInfo = isr.map(aliveBrokers.getOrElse(_, null)).filter(_ != null)
+                if (replicaInfo.size < replicas.size)
+                  throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+                    replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+                if (isrInfo.size < isr.size)
+                  throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+                    isr.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
+                new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
+              } catch {
+                case e: Throwable =>
+                  debug("Error while fetching metadata for %s. Possible cause: %s".format(topicPartition, e.getMessage))
+                  new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo,
+                    ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              }
+          }
+          topicResponses += new TopicMetadata(topic, partitionMetadata.toSeq)
+        }
+      }
+    }
+    topicResponses
+  }
+
+  def addOrUpdatePartitionInfo(topic: String,
+                               partitionId: Int,
+                               stateInfo: PartitionStateInfo) {
+    inLock(partitionMetadataLock.writeLock()) {
+      cache.get(topic) match {
+        case Some(infos) => infos.put(partitionId, stateInfo)
+        case None => {
+          val newInfos: mutable.Map[Int, PartitionStateInfo] = new mutable.HashMap[Int, PartitionStateInfo]
+          cache.put(topic, newInfos)
+          newInfos.put(partitionId, stateInfo)
+        }
+      }
+    }
+  }
+
+  def getPartitionInfos(topic: String) = {
+    inLock(partitionMetadataLock.readLock()) {
+      cache(topic)
+    }
+  }
+
+  def containsTopicAndPartition(topic: String,
+                                partitionId: Int): Boolean = {
+    inLock(partitionMetadataLock.readLock()) {
+      cache.get(topic) match {
+        case Some(partitionInfos) => partitionInfos.contains(partitionId)
+        case None => false
+      }
+    }
+  }
+
+  def updateCache(updateMetadataRequest: UpdateMetadataRequest,
+                  brokerId: Int,
+                  stateChangeLogger: StateChangeLogger) {
+    inLock(partitionMetadataLock.writeLock()) {
+      updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
+      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
+        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
+          removePartitionInfo(tp.topic, tp.partition)
+          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d")
+            .format(brokerId, tp, updateMetadataRequest.controllerId,
+            updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        } else {
+          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d")
+            .format(brokerId, info, tp, updateMetadataRequest.controllerId,
+            updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+        }
+      }
+    }
+  }
+
+  private def removePartitionInfo(topic: String, partitionId: Int) = {
+    cache.get(topic) match {
+      case Some(infos) => {
+        infos.remove(partitionId)
+        if(infos.isEmpty) {
+          cache.remove(topic)
+        }
+        true
+      }
+      case None => false
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 5588f59..11c20ce 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -23,16 +23,14 @@ import kafka.utils._
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common._
-import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
+import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.KafkaController
-import org.apache.log4j.Logger
 import org.I0Itec.zkclient.ZkClient
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.{IOException, File}
 import java.util.concurrent.TimeUnit
 
-
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
   val HighWatermarkFilename = "replication-offset-checkpoint"
@@ -205,6 +203,22 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
+    replicaStateChangeLock synchronized {
+      if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
+        val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
+          "old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
+          updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
+          controllerEpoch)
+        stateChangeLogger.warn(stateControllerEpochErrorMessage)
+        throw new ControllerMovedException(stateControllerEpochErrorMessage)
+      } else {
+        metadataCache.updateCache(updateMetadataRequest, localBrokerId, stateChangeLogger)
+        controllerEpoch = updateMetadataRequest.controllerEpoch
+      }
+    }
+  }
+
   def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
                              offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/caafc9d6/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 17b08e1..b1c4ce9 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -96,7 +96,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
 
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
+    apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -169,7 +169,7 @@ class SimpleFetchTest extends JUnit3Suite {
     val requestChannel = new RequestChannel(2, 5)
     val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
-    apis.metadataCache.addPartitionInfo(topic, partitionId, partitionStateInfo)
+    apis.metadataCache.addOrUpdatePartitionInfo(topic, partitionId, partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
 
     /**