You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/24 21:41:36 UTC
svn commit: r1188333 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/producer/Producer.scala
test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
test/scala/unit/kafka/producer/ProducerTest.scala
Author: nehanarkhede
Date: Mon Oct 24 19:41:35 2011
New Revision: 1188333
URL: http://svn.apache.org/viewvc?rev=1188333&view=rev
Log:
KAFKA-161 Producer using broker list does not load balance requests across multiple partitions on a broker; patched by nehanarkhede; reviewed by junrao
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Mon Oct 24 19:41:35 2011
@@ -24,6 +24,7 @@ import java.util.Properties
import kafka.cluster.{Partition, Broker}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{NoBrokersForPartitionException, InvalidConfigException, InvalidPartitionException}
+import kafka.api.ProducerRequest
class Producer[K,V](config: ProducerConfig,
partitioner: Partitioner[K],
@@ -118,11 +119,11 @@ class Producer[K,V](config: ProducerConf
brokerPartitionInfo.updateInfo
}
- val numBrokerPartitions = getNumPartitionsForTopic(pd)
- val totalNumPartitions = numBrokerPartitions.length
+ val topicPartitionsList = getPartitionListForTopic(pd)
+ val totalNumPartitions = topicPartitionsList.length
val partitionId = getPartition(pd.getKey, totalNumPartitions)
- brokerIdPartition = Some(numBrokerPartitions(partitionId))
+ brokerIdPartition = Some(topicPartitionsList(partitionId))
brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
numRetries += 1
}
@@ -145,32 +146,36 @@ class Producer[K,V](config: ProducerConf
private def configSend(producerData: ProducerData[K,V]*) {
val producerPoolRequests = producerData.map { pd =>
// find the broker partitions registered for this topic
- val numBrokerPartitions = getNumPartitionsForTopic(pd)
- val totalNumPartitions = numBrokerPartitions.length
+ val topicPartitionsList = getPartitionListForTopic(pd)
+ val totalNumPartitions = topicPartitionsList.length
- val partitionId = getPartition(pd.getKey, totalNumPartitions)
- val brokerIdPartition = numBrokerPartitions(partitionId)
+ val randomBrokerId = random.nextInt(totalNumPartitions)
+ val brokerIdPartition = topicPartitionsList(randomBrokerId)
val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
if(logger.isDebugEnabled)
+ logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port +
+ " on a randomly chosen partition")
+ val partition = ProducerRequest.RandomPartition
+ if(logger.isDebugEnabled)
logger.debug("Sending message to broker " + brokerInfo.host + ":" + brokerInfo.port + " on a partition " +
brokerIdPartition.partId)
producerPool.getProducerPoolData(pd.getTopic,
- new Partition(brokerIdPartition.brokerId, brokerIdPartition.partId),
+ new Partition(brokerIdPartition.brokerId, partition),
pd.getData)
}
producerPool.send(producerPoolRequests: _*)
}
- private def getNumPartitionsForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
+ private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
if(logger.isDebugEnabled)
logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
- val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
+ val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
if(logger.isDebugEnabled)
- logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
- val totalNumPartitions = numBrokerPartitions.length
+ logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
+ val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
- numBrokerPartitions
+ topicPartitionsList
}
/**
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Mon Oct 24 19:41:35 2011
@@ -169,10 +169,10 @@ class ProducerTest extends JUnitSuite {
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
- // it should send to partition 0 due to the StaticPartitioner
+ // it should send to a random partition due to use of broker.list
val messageList = new java.util.ArrayList[Message]
messageList.add(new Message("t".getBytes()))
- syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
+ syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = messageList))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
@@ -367,8 +367,8 @@ class ProducerTest extends JUnitSuite {
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to partition 0 (first partition) on second broker i.e broker2
- asyncProducer1.send(topic, "test1", 0)
+ // it should send to a random partition due to use of broker.list
+ asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
@@ -583,8 +583,8 @@ class ProducerTest extends JUnitSuite {
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to partition 0 (first partition) on second broker i.e broker2
- asyncProducer1.send(topic, "test1", 0)
+ // it should send to a random partition due to use of broker.list
+ asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1188333&r1=1188332&r2=1188333&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Oct 24 19:41:35 2011
@@ -155,8 +155,8 @@ class ProducerTest extends JUnitSuite {
// 2 sync producers
val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
- // it should send to partition 0 due to the StaticPartitioner
- syncProducer1.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
+ // it should send to a random partition due to use of broker.list
+ syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
EasyMock.expectLastCall
syncProducer1.close
EasyMock.expectLastCall
@@ -373,8 +373,8 @@ class ProducerTest extends JUnitSuite {
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to partition 0 (first partition) on second broker i.e broker2
- asyncProducer1.send(topic, "test1", 0)
+ // it should send to a random partition due to use of broker.list
+ asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall
@@ -646,8 +646,8 @@ class ProducerTest extends JUnitSuite {
// 2 async producers
val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
- // it should send to partition 0 (first partition) on second broker i.e broker2
- asyncProducer1.send(topic, "test1", 0)
+ // it should send to a random partition due to use of broker.list
+ asyncProducer1.send(topic, "test1", -1)
EasyMock.expectLastCall
asyncProducer1.close
EasyMock.expectLastCall