You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:03:55 UTC

[10/36] git commit: kafka-959; DefaultEventHandler can send more produce requests than necesary; patched by Guozhanh Wang; reviewed by Jun Rao

kafka-959; DefaultEventHandler can send more produce requests than necesary; patched by Guozhanh Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/76d39052
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/76d39052
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/76d39052

Branch: refs/heads/trunk
Commit: 76d39052f717cb6d9fdac6a516df298036b5ffff
Parents: bc5620c
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Jul 25 10:10:35 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jul 25 10:10:35 2013 -0700

----------------------------------------------------------------------
 .../producer/async/DefaultEventHandler.scala    | 27 +++++++++++++++-----
 1 file changed, 20 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/76d39052/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index a00a0df..f71a242 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -45,12 +45,14 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
   private val topicMetadataRefreshInterval = config.topicMetadataRefreshIntervalMs
   private var lastTopicMetadataRefreshTime = 0L
   private val topicMetadataToRefresh = Set.empty[String]
+  private val sendPartitionPerTopicCache = HashMap.empty[String, Int]
 
   private val producerStats = ProducerStatsRegistry.getProducerStats(config.clientId)
   private val producerTopicStats = ProducerTopicStatsRegistry.getProducerTopicStats(config.clientId)
 
   def handle(events: Seq[KeyedMessage[K,V]]) {
     lock synchronized {
+      sendPartitionPerTopicCache.clear()
       val serializedData = serialize(events)
       serializedData.foreach {
         keyed =>
@@ -206,18 +208,29 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
       throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
     val partition =
       if(key == null) {
-        // If the key is null, we don't really need a partitioner so we just send to the next
-        // available partition
-        val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
-        if (availablePartitions.isEmpty)
-          throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
-        val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
-        availablePartitions(index).partitionId
+        // If the key is null, we don't really need a partitioner
+        // So we look up in the send partition cache for the topic to decide the target partition
+        val id = sendPartitionPerTopicCache.get(topic)
+        id match {
+          case Some(partitionId) =>
+            // directly return the partitionId without checking availability of the leader,
+            // since we want to postpone the failure until the send operation anyways
+            partitionId
+          case None =>
+            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
+            if (availablePartitions.isEmpty)
+              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
+            val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size
+            val partitionId = availablePartitions(index).partitionId
+            sendPartitionPerTopicCache.put(topic, partitionId)
+            partitionId
+        }
       } else
         partitioner.partition(key, numPartitions)
     if(partition < 0 || partition >= numPartitions)
       throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
         "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
+    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
     partition
   }