You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:55 UTC
[10/36] git commit: kafka-959;
DefaultEventHandler can send more produce requests than necesary;
patched by Guozhanh Wang; reviewed by Jun Rao
kafka-959; DefaultEventHandler can send more produce requests than necesary; patched by Guozhanh Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76d39052
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76d39052
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76d39052
Branch: refs/heads/trunk
Commit: 76d39052f717cb6d9fdac6a516df298036b5ffff
Parents: bc5620c
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Jul 25 10:10:35 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 25 10:10:35 2013 -0700
----------------------------------------------------------------------
.../producer/async/DefaultEventHandler.scala | 27 +++++++++++++++-----
1 file changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/76d39052/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a00a0df..f71a242 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -45,12 +45,14 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
private var lastTopicMetadataRefreshTime = 0L
private val topicMetadataToRefresh = Set.empty[String]
+ private val sendPartitionPerTopicCache = HashMap.empty[String, Int]
private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
def handle(events: Seq[KeyedMessage[K,V]]) {
lock synchronized {
+ sendPartitionPerTopicCache.clear()
val serializedData = serialize(events)
serializedData.foreach {
keyed =>
@@ -206,18 +208,29 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
val partition =
if(key == null) {
- // If the key is null, we don't really need a partitioner so we just send to the next
- // available partition
- val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
- if (availablePartitions.isEmpty)
- throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
- val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
- availablePartitions(index).partitionId
+ // If the key is null, we don't really need a partitioner
+ // So we look up in the send partition cache for the topic to decide the target partition
+ val id = sendPartitionPerTopicCache.get(topic)
+ id match {
+ case Some(partitionId) =>
+ // directly return the partitionId without checking availability of the leader,
+ // since we want to postpone the failure until the send operation anyways
+ partitionId
+ case None =>
+ val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
+ if (availablePartitions.isEmpty)
+ throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
+ val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
+ val partitionId = availablePartitions(index).partitionId
+ sendPartitionPerTopicCache.put(topic, partitionId)
+ partitionId
+ }
} else
partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
"; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
+ trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
partition
}