You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/05/31 07:17:30 UTC
git commit: kafka-899;
LeaderNotAvailableException the first time a new message for a
partition is processed; patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 d93cbc610 -> 1caae2c2a
kafka-899; LeaderNotAvailableException the first time a new message for a partition is processed; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1caae2c2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1caae2c2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1caae2c2
Branch: refs/heads/0.8
Commit: 1caae2c2a10856ea2a31c8b82f1fae5b107a2a07
Parents: d93cbc6
Author: Jun Rao <ju...@gmail.com>
Authored: Thu May 30 22:16:47 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu May 30 22:16:47 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/common/UnknownTopicException.scala | 25 ----------
.../kafka/producer/async/DefaultEventHandler.scala | 36 +++++++++------
.../kafka/producer/async/ProducerSendThread.scala | 2 +-
3 files changed, 22 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/common/UnknownTopicException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/UnknownTopicException.scala b/core/src/main/scala/kafka/common/UnknownTopicException.scala
deleted file mode 100644
index 710d3bf..0000000
--- a/core/src/main/scala/kafka/common/UnknownTopicException.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-/**
- * Thrown when a request is made for a topic, that hasn't been created in a Kafka cluster
- */
-class UnknownTopicException(message: String) extends RuntimeException(message) {
- def this() = this(null)
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 1a74951..a00a0df 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -149,7 +149,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
try {
for (message <- messages) {
val topicPartitionsList = getPartitionListForTopic(message)
- val partitionIndex = getPartition(message.key, topicPartitionsList)
+ val partitionIndex = getPartition(message.topic, message.key, topicPartitionsList)
val brokerPartition = topicPartitionsList(partitionIndex)
// postpone the failure until the send operation, so that requests for other brokers are handled correctly
@@ -177,9 +177,9 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
Some(ret)
}catch { // Swallow recoverable exceptions and return None so that they can be retried.
- case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
- case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
- case oe => error("Failed to collate messages by topic, partition due to", oe); None
+ case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
+ case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
+ case oe => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
}
}
@@ -200,25 +200,24 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
* @param topicPartitionList the list of available partitions
* @return the partition id
*/
- private def getPartition(key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
+ private def getPartition(topic: String, key: K, topicPartitionList: Seq[PartitionAndLeader]): Int = {
val numPartitions = topicPartitionList.size
if(numPartitions <= 0)
- throw new UnknownTopicOrPartitionException("Invalid number of partitions: " + numPartitions +
- "\n Valid values are > 0")
+ throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
// If the key is null, we don't really need a partitioner so we just send to the next
// available partition
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
if (availablePartitions.isEmpty)
- throw new LeaderNotAvailableException("No leader for any partition")
+ throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
availablePartitions(index).partitionId
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
- throw new UnknownTopicOrPartitionException("Invalid partition id : " + partition +
- "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+ throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
+ "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
partition
}
@@ -253,11 +252,18 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
trace("Successfully sent message: %s".format(Utils.readString(message.message.payload)))))
}
- failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
- .map(partitionStatus => partitionStatus._1)
- if(failedTopicPartitions.size > 0)
- error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s"
- .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(",")))
+ val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
+ failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
+ if(failedTopicPartitions.size > 0) {
+ val errorString = failedPartitionsAndStatus
+ .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
+ (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
+ .map{
+ case(topicAndPartition, status) =>
+ topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
+ }.mkString(",")
+ warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
+ }
failedTopicPartitions
} else
Seq.empty[TopicAndPartition]
http://git-wip-us.apache.org/repos/asf/kafka/blob/1caae2c2/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 090400d..2b41a49 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -50,7 +50,7 @@ class ProducerSendThread[K,V](val threadName: String,
}
def shutdown = {
- info("Beging shutting down ProducerSendThread")
+ info("Begin shutting down ProducerSendThread")
queue.put(shutdownCommand)
shutdownLatch.await
info("Shutdown ProducerSendThread complete")