You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/27 01:40:29 UTC

git commit: KAFKA-774 Periodic refresh of topic metadata on the producer does not check for error code in the response; reviewed by Swapnil Ghike

Updated Branches:
  refs/heads/0.8 37ca9db78 -> 6989dac88


KAFKA-774 Periodic refresh of topic metadata on the producer does not check for error code in the response; reviewed by Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6989dac8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6989dac8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6989dac8

Branch: refs/heads/0.8
Commit: 6989dac889f2afd3cd192d345d3bd29995e605b9
Parents: 37ca9db
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Feb 26 16:34:11 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 26 16:34:11 2013 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala   |   57 +++++++--------
 .../kafka/common/LeaderNotAvailableException.scala |    7 +-
 .../scala/kafka/producer/BrokerPartitionInfo.scala |    5 +-
 .../kafka/producer/async/DefaultEventHandler.scala |    1 -
 4 files changed, 33 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 93a634e..b9ef4dc 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -87,55 +87,50 @@ object AdminUtils extends Logging {
       case e2 => throw new AdministrationException(e2.toString)
     }
   }
-  
-  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata = 
+
+  def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
 
   def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
     val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
     topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
   }
-  
+
   private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
     if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
       val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-
       val partitionMetadata = sortedPartitions.map { partitionMap =>
-      val partition = partitionMap._1
-      val replicas = partitionMap._2
-      val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
-      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-      debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+        val partition = partitionMap._1
+        val replicas = partitionMap._2
+        val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-      var leaderInfo: Option[Broker] = None
-      var replicaInfo: Seq[Broker] = Nil
-      var isrInfo: Seq[Broker] = Nil
-      try {
+        var leaderInfo: Option[Broker] = None
+        var replicaInfo: Seq[Broker] = Nil
+        var isrInfo: Seq[Broker] = Nil
         try {
-          leaderInfo = leader match {
-            case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
-            case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+          try {
+            leaderInfo = leader match {
+              case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+              case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+            }
+          } catch {
+            case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
+          }
+          try {
+            replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
+            isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+          } catch {
+            case e => throw new ReplicaNotAvailableException(e)
           }
-        } catch {
-          case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition))
-        }
-
-        try {
-          replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
-          isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
-        } catch {
-          case e => throw new ReplicaNotAvailableException(e)
-        }
-
           new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
         } catch {
-          case e: ReplicaNotAvailableException =>
+          case e =>
+            error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
             new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
                                   ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-          case le: LeaderNotAvailableException =>
-            new PartitionMetadata(partition, None, replicaInfo, isrInfo,
-                                  ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
         }
       }
       new TopicMetadata(topic, partitionMetadata)

http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
index 7df8b40..972728e 100644
--- a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
+++ b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
@@ -20,6 +20,7 @@ package kafka.common
 /**
  * Thrown when a request is made for partition, but no leader exists for that partition
  */
-class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file
+class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index a0e2b44..617fc43 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,10 +80,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
       if(tmd.errorCode == ErrorMapping.NoError){
         topicPartitionInfo.put(tmd.topic, tmd)
       } else
-        warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+        warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
       tmd.partitionsMetadata.foreach(pmd =>{
         if (pmd.errorCode != ErrorMapping.NoError){
-          debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
+          warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
+               ErrorMapping.exceptionFor(pmd.errorCode))
         }
       })
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 8954a03..4cfe2a4 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -81,7 +81,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       }
       if(outstandingProduceRequests.size > 0) {
         producerStats.failedSendRate.mark()
-
         val correlationIdEnd = correlationId.get()
         error("Failed to send requests for topics %s with correlation ids in [%d,%d]".format(outstandingProduceRequests.map(_.topic).mkString(","),
           correlationIdStart, correlationIdEnd-1))