You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/22 17:12:59 UTC

git commit: KAFKA-820 Topic metadata request handling fails to return all metadata about replicas; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 7b14ebae3 -> 08b2a37c3


KAFKA-820 Topic metadata request handling fails to return all metadata about replicas; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08b2a37c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08b2a37c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08b2a37c

Branch: refs/heads/0.8
Commit: 08b2a37c33ed907614e9621f315bb737f9be490e
Parents: 7b14eba
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 22 09:12:41 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 22 09:12:41 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   36 ++++++++++-----
 .../scala/kafka/producer/BrokerPartitionInfo.scala |   10 ++--
 .../main/scala/kafka/producer/ProducerPool.scala   |    4 +-
 3 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b9ef4dc..f4bf3b9 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -24,6 +24,7 @@ import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection._
+import mutable.ListBuffer
 import scala.collection.mutable
 import kafka.common._
 import scala.Some
@@ -111,13 +112,14 @@ object AdminUtils extends Logging {
         var replicaInfo: Seq[Broker] = Nil
         var isrInfo: Seq[Broker] = Nil
         try {
-          try {
-            leaderInfo = leader match {
-              case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
-              case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
-            }
-          } catch {
-            case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
+          leaderInfo = leader match {
+            case Some(l) =>
+              try {
+                Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+              } catch {
+                case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
+              }
+            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
           }
           try {
             replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
@@ -125,12 +127,18 @@ object AdminUtils extends Logging {
           } catch {
             case e => throw new ReplicaNotAvailableException(e)
           }
+          if(replicaInfo.size < replicas.size)
+            throw new ReplicaNotAvailableException("Replica information not available for following brokers: " +
+              replicas.filterNot(replicaInfo.map(_.id).contains(_)).mkString(","))
+          if(isrInfo.size < inSyncReplicas.size)
+            throw new ReplicaNotAvailableException("In Sync Replica information not available for following brokers: " +
+              inSyncReplicas.filterNot(isrInfo.map(_.id).contains(_)).mkString(","))
           new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
         } catch {
           case e =>
             error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
             new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
-                                  ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
         }
       }
       new TopicMetadata(topic, partitionMetadata)
@@ -143,19 +151,23 @@ object AdminUtils extends Logging {
   private def getBrokerInfoFromCache(zkClient: ZkClient,
                                      cachedBrokerInfo: scala.collection.mutable.Map[Int, Broker],
                                      brokerIds: Seq[Int]): Seq[Broker] = {
-    brokerIds.map { id =>
+    var failedBrokerIds: ListBuffer[Int] = new ListBuffer()
+    val brokerMetadata = brokerIds.map { id =>
       val optionalBrokerInfo = cachedBrokerInfo.get(id)
       optionalBrokerInfo match {
-        case Some(brokerInfo) => brokerInfo // return broker info from the cache
+        case Some(brokerInfo) => Some(brokerInfo) // return broker info from the cache
         case None => // fetch it from zookeeper
           ZkUtils.getBrokerInfo(zkClient, id) match {
             case Some(brokerInfo) =>
               cachedBrokerInfo += (id -> brokerInfo)
-              brokerInfo
-            case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id)
+              Some(brokerInfo)
+            case None =>
+              failedBrokerIds += id
+              None
           }
       }
     }
+    brokerMetadata.filter(_.isDefined).map(_.get)
   }
 
   private def getWrappedIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index 617fc43..72597ef 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,12 +80,12 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       if(tmd.errorCode == ErrorMapping.NoError){
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
       tmd.partitionsMetadata.foreach(pmd =>{
-        if (pmd.errorCode != ErrorMapping.NoError){
-          warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
-               ErrorMapping.exceptionFor(pmd.errorCode))
-        }
+        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
+          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
+            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
+        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
       })
     })
     producerPool.updateProducer(topicsMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/08b2a37c/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 4970029..43df70b 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -43,9 +43,9 @@ class ProducerPool(val config: ProducerConfig) extends Logging {
   private val syncProducers = new HashMap[Int, SyncProducer]
   private val lock = new Object()
 
-  def updateProducer(topicMetadatas: Seq[TopicMetadata]) {
+  def updateProducer(topicMetadata: Seq[TopicMetadata]) {
     val newBrokers = new collection.mutable.HashSet[Broker]
-    topicMetadatas.foreach(tmd => {
+    topicMetadata.foreach(tmd => {
       tmd.partitionsMetadata.foreach(pmd => {
         if(pmd.leader.isDefined)
           newBrokers+=(pmd.leader.get)