You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/29 04:22:14 UTC
svn commit: r1294959 - in /incubator/kafka/trunk/core/src:
main/scala/kafka/producer/ test/scala/unit/kafka/javaapi/producer/
test/scala/unit/kafka/producer/
Author: junrao
Date: Wed Feb 29 03:22:14 2012
New Revision: 1294959
URL: http://svn.apache.org/viewvc?rev=1294959&view=rev
Log:
use propertyExists to test if both broker.list and zk.connect are present; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-290
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Wed Feb 29 03:22:14 2012
@@ -33,10 +33,6 @@ class Producer[K,V](config: ProducerConf
/* use the other constructor*/
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
- if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
- throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
- if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
- warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
private val random = new java.util.Random
// check if zookeeper based auto partition discovery is enabled
private val zkEnabled = Utils.propertyExists(config.zkConnect)
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Feb 29 03:22:14 2012
@@ -29,13 +29,16 @@ class ProducerConfig(val props: Properti
* to pass in static broker and per-broker partition information. Format- *
* brokerid1:host1:port1, brokerid2:host2:port2*/
val brokerList = Utils.getString(props, "broker.list", null)
- if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
+ if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
/** If both broker.list and zk.connect options are specified, throw an exception */
- if(brokerList != null && zkConnect != null)
+ if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
+ if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
+ throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+
/** the partitioner class for partitioning events amongst sub-topics */
val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -232,6 +232,7 @@ class ProducerTest extends JUnitSuite {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
@@ -264,6 +265,7 @@ class ProducerTest extends JUnitSuite {
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
@@ -289,6 +291,7 @@ class ProducerTest extends JUnitSuite {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
try {
@@ -320,6 +323,7 @@ class ProducerTest extends JUnitSuite {
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
try {
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -27,6 +27,7 @@ import org.scalatest.junit.JUnitSuite
import kafka.producer.async._
import kafka.serializer.Encoder
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.utils.TestZKUtils
class AsyncProducerTest extends JUnitSuite {
@@ -54,6 +55,7 @@ class AsyncProducerTest extends JUnitSui
props.put("port", "9092")
props.put("queue.size", "10")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -92,6 +94,7 @@ class AsyncProducerTest extends JUnitSui
props.put("port", "9092")
props.put("queue.size", "10")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -130,6 +133,7 @@ class AsyncProducerTest extends JUnitSui
props.put("queue.size", "10")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("batch.size", "5")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
@@ -168,6 +172,7 @@ class AsyncProducerTest extends JUnitSui
props.put("queue.size", "10")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("queue.time", "200")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
@@ -200,6 +205,7 @@ class AsyncProducerTest extends JUnitSui
asyncProducerProps.put("queue.size", "10")
asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
asyncProducerProps.put("queue.time", "100")
+ asyncProducerProps.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(asyncProducerProps)
val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -226,6 +232,7 @@ class AsyncProducerTest extends JUnitSui
props.put("queue.size", "50")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("batch.size", "10")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
@@ -266,6 +273,7 @@ class AsyncProducerTest extends JUnitSui
props.put("queue.size", "50")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("batch.size", "20")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new AsyncProducerConfig(props)
Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -238,6 +238,7 @@ class ProducerTest extends JUnitSuite {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
@@ -270,6 +271,7 @@ class ProducerTest extends JUnitSuite {
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
@@ -295,6 +297,7 @@ class ProducerTest extends JUnitSuite {
val props = new Properties()
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
try {
@@ -326,6 +329,7 @@ class ProducerTest extends JUnitSuite {
props.put("partitioner.class", "kafka.producer.NegativePartitioner")
props.put("serializer.class", "kafka.producer.StringSerializer")
props.put("producer.type", "async")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
try {