You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/24 21:41:36 UTC

svn commit: r1188333 - in /incubator/kafka/trunk/core/src: main/scala/kafka/producer/Producer.scala test/scala/unit/kafka/javaapi/producer/ProducerTest.scala test/scala/unit/kafka/producer/ProducerTest.scala

Author: nehanarkhede
Date: Mon Oct 24 19:41:35 2011
New Revision: 1188333

URL: http://svn.apache.org/viewvc?rev=1188333&view=rev
Log:
KAFKA-161 Producer using broker list does not load balance requests across multiple partitions on a broker; patched by nehanarkhede; reviewed by junrao

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Oct 24 19:41:35 2011
@@ -24,6 +24,7 @@ import java.util.Properties
 import kafka.cluster.{Partition, Broker}
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.api.ProducerRequest
 
 class Producer[K,V](config: ProducerConfig,
                     partitioner: Partitioner[K],
@@ -118,11 +119,11 @@ class Producer[K,V](config: ProducerConf
           brokerPartitionInfo.updateInfo
         }
 
-        val numBrokerPartitions = getNumPartitionsForTopic(pd)
-        val totalNumPartitions = numBrokerPartitions.length
+        val topicPartitionsList = getPartitionListForTopic(pd)
+        val totalNumPartitions = topicPartitionsList.length
 
         val partitionId = getPartition(pd.getKey, totalNumPartitions)
-        brokerIdPartition = Some(numBrokerPartitions(partitionId))
+        brokerIdPartition = Some(topicPartitionsList(partitionId))
         brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
         numRetries += 1
       }
@@ -145,32 +146,36 @@ class Producer[K,V](config: ProducerConf
   private def configSend(producerData: ProducerData[K,V]*) {
     val producerPoolRequests = producerData.map { pd =>
     // find the broker partitions registered for this topic
-      val numBrokerPartitions = getNumPartitionsForTopic(pd)
-      val totalNumPartitions = numBrokerPartitions.length
+      val topicPartitionsList = getPartitionListForTopic(pd)
+      val totalNumPartitions = topicPartitionsList.length
 
-      val partitionId = getPartition(pd.getKey, totalNumPartitions)
-      val brokerIdPartition = numBrokerPartitions(partitionId)
+      val randomBrokerId = random.nextInt(totalNumPartitions)
+      val brokerIdPartition = topicPartitionsList(randomBrokerId)
       val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
 
       if(logger.isDebugEnabled)
+        logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+                " on a randomly chosen partition")
+      val partition = ProducerRequest.RandomPartition
+      if(logger.isDebugEnabled)
         logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
           brokerIdPartition.partId)
       producerPool.getProducerPoolData(pd.getTopic,
-        new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId),
+        new Partition(brokerIdPartition.brokerId, partition),
         pd.getData)
     }
     producerPool.send(producerPoolRequests: _*)
   }
 
-  private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
+  private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
     if(logger.isDebugEnabled)
       logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
-    val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+    val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
     if(logger.isDebugEnabled)
-      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
-    val totalNumPartitions = numBrokerPartitions.length
+      logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+    val totalNumPartitions = topicPartitionsList.length
     if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
-    numBrokerPartitions
+    topicPartitionsList
   }
 
   /**

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Mon Oct 24 19:41:35 2011
@@ -169,10 +169,10 @@ class ProducerTest extends JUnitSuite {
     // 2 sync producers
     val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
     val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
-    // it should send to partition 0 due to the StaticPartitioner
+    // it should send to a random partition due to use of broker.list
     val messageList = new java.util.ArrayList[Message]
     messageList.add(new Message("t".getBytes()))
-    syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
     EasyMock.expectLastCall
     syncProducer1.close
     EasyMock.expectLastCall
@@ -367,8 +367,8 @@ class ProducerTest extends JUnitSuite {
     // 2 async producers
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", 0)
+    // it should send to a random partition due to use of broker.list
+    asyncProducer1.send(topic, "test1", -1)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall
@@ -583,8 +583,8 @@ class ProducerTest extends JUnitSuite {
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
     val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", 0)
+    // it should send to a random partition due to use of broker.list
+    asyncProducer1.send(topic, "test1", -1)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Oct 24 19:41:35 2011
@@ -155,8 +155,8 @@ class ProducerTest extends JUnitSuite {
     // 2 sync producers
     val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
     val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
-    // it should send to partition 0 due to the StaticPartitioner
-    syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
+    // it should send to a random partition due to use of broker.list
+    syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
     EasyMock.expectLastCall
     syncProducer1.close
     EasyMock.expectLastCall
@@ -373,8 +373,8 @@ class ProducerTest extends JUnitSuite {
     // 2 async producers
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", 0)
+    // it should send to a random partition due to use of broker.list
+    asyncProducer1.send(topic, "test1", -1)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall
@@ -646,8 +646,8 @@ class ProducerTest extends JUnitSuite {
     // 2 async producers
     val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
     val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
-    // it should send to partition 0 (first partition) on second broker i.e broker2
-    asyncProducer1.send(topic, "test1", 0)
+    // it should send to a random partition due to use of broker.list
+    asyncProducer1.send(topic, "test1", -1)
     EasyMock.expectLastCall
     asyncProducer1.close
     EasyMock.expectLastCall