You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/27 01:40:29 UTC
git commit: KAFKA-774 Periodic refresh of topic metadata on the
producer does not check for error code in the response;
reviewed by Swapnil Ghike
Updated Branches:
refs/heads/0.8 37ca9db78 -> 6989dac88
KAFKA-774 Periodic refresh of topic metadata on the producer does not check for error code in the response; reviewed by Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6989dac8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6989dac8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6989dac8
Branch: refs/heads/0.8
Commit: 6989dac889f2afd3cd192d345d3bd29995e605b9
Parents: 37ca9db
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Feb 26 16:34:11 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Feb 26 16:34:11 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/admin/AdminUtils.scala | 57 +++++++--------
.../kafka/common/LeaderNotAvailableException.scala | 7 +-
.../scala/kafka/producer/BrokerPartitionInfo.scala | 5 +-
.../kafka/producer/async/DefaultEventHandler.scala | 1 -
4 files changed, 33 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 93a634e..b9ef4dc 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -87,55 +87,50 @@ object AdminUtils extends Logging {
case e2 => throw new AdministrationException(e2.toString)
}
}
-
- def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
+
+ def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker])
def fetchTopicMetadataFromZk(topics: Set[String], zkClient: ZkClient): Set[TopicMetadata] = {
val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
topics.map(topic => fetchTopicMetadataFromZk(topic, zkClient, cachedBrokerInfo))
}
-
+
private def fetchTopicMetadataFromZk(topic: String, zkClient: ZkClient, cachedBrokerInfo: mutable.HashMap[Int, Broker]): TopicMetadata = {
if(ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic)).get(topic).get
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
-
val partitionMetadata = sortedPartitions.map { partitionMap =>
- val partition = partitionMap._1
- val replicas = partitionMap._2
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
- debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
+ val partition = partitionMap._1
+ val replicas = partitionMap._2
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
- var leaderInfo: Option[Broker] = None
- var replicaInfo: Seq[Broker] = Nil
- var isrInfo: Seq[Broker] = Nil
- try {
+ var leaderInfo: Option[Broker] = None
+ var replicaInfo: Seq[Broker] = Nil
+ var isrInfo: Seq[Broker] = Nil
try {
- leaderInfo = leader match {
- case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
- case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+ try {
+ leaderInfo = leader match {
+ case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head)
+ case None => throw new LeaderNotAvailableException("No leader exists for partition " + partition)
+ }
+ } catch {
+ case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition), e)
+ }
+ try {
+ replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
+ isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
+ } catch {
+ case e => throw new ReplicaNotAvailableException(e)
}
- } catch {
- case e => throw new LeaderNotAvailableException("Leader not available for topic %s partition %d".format(topic, partition))
- }
-
- try {
- replicaInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt))
- isrInfo = getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas)
- } catch {
- case e => throw new ReplicaNotAvailableException(e)
- }
-
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
} catch {
- case e: ReplicaNotAvailableException =>
+ case e =>
+ error("Error while fetching metadata for partition [%s,%d]".format(topic, partition), e)
new PartitionMetadata(partition, leaderInfo, replicaInfo, isrInfo,
ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- case le: LeaderNotAvailableException =>
- new PartitionMetadata(partition, None, replicaInfo, isrInfo,
- ErrorMapping.codeFor(le.getClass.asInstanceOf[Class[Throwable]]))
}
}
new TopicMetadata(topic, partitionMetadata)
http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
index 7df8b40..972728e 100644
--- a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
+++ b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala
@@ -20,6 +20,7 @@ package kafka.common
/**
* Thrown when a request is made for partition, but no leader exists for that partition
*/
-class LeaderNotAvailableException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
\ No newline at end of file
+class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+ def this(message: String) = this(message, null)
+ def this() = this(null, null)
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
index a0e2b44..617fc43 100644
--- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
+++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
@@ -80,10 +80,11 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig,
if(tmd.errorCode == ErrorMapping.NoError){
topicPartitionInfo.put(tmd.topic, tmd)
} else
- warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
+ warn("Error while fetching metadata for topic [%s]: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode))
tmd.partitionsMetadata.foreach(pmd =>{
if (pmd.errorCode != ErrorMapping.NoError){
- debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode))
+ warn("Error while fetching metadata for topic partition [%s,%d]: [%s]".format(tmd.topic, pmd.partitionId, pmd),
+ ErrorMapping.exceptionFor(pmd.errorCode))
}
})
})
http://git-wip-us.apache.org/repos/asf/kafka/blob/6989dac8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 8954a03..4cfe2a4 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -81,7 +81,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
if(outstandingProduceRequests.size > 0) {
producerStats.failedSendRate.mark()
-
val correlationIdEnd = correlationId.get()
error("Failed to send requests for topics %s with correlation ids in [%d,%d]".format(outstandingProduceRequests.map(_.topic).mkString(","),
correlationIdStart, correlationIdEnd-1))