You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/02/29 04:22:14 UTC

svn commit: r1294959 - in /incubator/kafka/trunk/core/src: main/scala/kafka/producer/ test/scala/unit/kafka/javaapi/producer/ test/scala/unit/kafka/producer/

Author: junrao
Date: Wed Feb 29 03:22:14 2012
New Revision: 1294959

URL: http://svn.apache.org/viewvc?rev=1294959&view=rev
Log:
use propertyExists to test if both broker.list and zk.connect are present; patched by Jun Rao; reviewed by Neha Narkhede; KAFKA-290

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/Producer.scala Wed Feb 29 03:22:14 2012
@@ -33,10 +33,6 @@ class Producer[K,V](config: ProducerConf
                                                           /* use the other constructor*/
 extends Logging {
   private val hasShutdown = new AtomicBoolean(false)
-  if(!Utils.propertyExists(config.zkConnect) && !Utils.propertyExists(config.brokerList))
-    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
-  if (Utils.propertyExists(config.zkConnect) && Utils.propertyExists(config.brokerList))
-    warn("Both zk.connect and broker.list provided (zk.connect takes precedence).")
   private val random = new java.util.Random
   // check if zookeeper based auto partition discovery is enabled
   private val zkEnabled = Utils.propertyExists(config.zkConnect)

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ProducerConfig.scala Wed Feb 29 03:22:14 2012
@@ -29,13 +29,16 @@ class ProducerConfig(val props: Properti
    *  to pass in static broker and per-broker partition information. Format-    *
    *  brokerid1:host1:port1, brokerid2:host2:port2*/
   val brokerList = Utils.getString(props, "broker.list", null)
-  if(brokerList != null && Utils.getString(props, "partitioner.class", null) != null)
+  if(Utils.propertyExists(brokerList) && Utils.getString(props, "partitioner.class", null) != null)
     throw new InvalidConfigException("partitioner.class cannot be used when broker.list is set")
 
   /** If both broker.list and zk.connect options are specified, throw an exception */
-  if(brokerList != null && zkConnect != null)
+  if(Utils.propertyExists(brokerList) && Utils.propertyExists(zkConnect))
     throw new InvalidConfigException("only one of broker.list and zk.connect can be specified")
 
+  if(!Utils.propertyExists(zkConnect) && !Utils.propertyExists(brokerList))
+    throw new InvalidConfigException("At least one of zk.connect or broker.list must be specified")
+
   /** the partitioner class for partitioning events amongst sub-topics */
   val partitionerClass = Utils.getString(props, "partitioner.class", "kafka.producer.DefaultPartitioner")
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -232,6 +232,7 @@ class ProducerTest extends JUnitSuite {
     val props = new Properties()
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
     producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
@@ -264,6 +265,7 @@ class ProducerTest extends JUnitSuite {
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("producer.type", "async")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
     producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
@@ -289,6 +291,7 @@ class ProducerTest extends JUnitSuite {
     val props = new Properties()
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
     try {
@@ -320,6 +323,7 @@ class ProducerTest extends JUnitSuite {
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("producer.type", "async")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       new ConcurrentHashMap[Int, kafka.producer.SyncProducer](), asyncProducers)
     try {

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -27,6 +27,7 @@ import org.scalatest.junit.JUnitSuite
 import kafka.producer.async._
 import kafka.serializer.Encoder
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.utils.TestZKUtils
 
 class AsyncProducerTest extends JUnitSuite {
 
@@ -54,6 +55,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("port", "9092")
     props.put("queue.size", "10")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new AsyncProducerConfig(props)
 
     val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -92,6 +94,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("port", "9092")
     props.put("queue.size", "10")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val config = new AsyncProducerConfig(props)
 
     val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -130,6 +133,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("queue.size", "10")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("batch.size", "5")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new AsyncProducerConfig(props)
 
@@ -168,6 +172,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("queue.size", "10")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("queue.time", "200")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new AsyncProducerConfig(props)
 
@@ -200,6 +205,7 @@ class AsyncProducerTest extends JUnitSui
     asyncProducerProps.put("queue.size", "10")
     asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
     asyncProducerProps.put("queue.time", "100")
+    asyncProducerProps.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new AsyncProducerConfig(asyncProducerProps)
     val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
@@ -226,6 +232,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("queue.size", "50")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("batch.size", "10")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new AsyncProducerConfig(props)
 
@@ -266,6 +273,7 @@ class AsyncProducerTest extends JUnitSui
     props.put("queue.size", "50")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("batch.size", "20")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new AsyncProducerConfig(props)
 

Modified: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1294959&r1=1294958&r2=1294959&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Feb 29 03:22:14 2012
@@ -238,6 +238,7 @@ class ProducerTest extends JUnitSuite {
     val props = new Properties()
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
     producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
@@ -270,6 +271,7 @@ class ProducerTest extends JUnitSuite {
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("producer.type", "async")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
     producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
@@ -295,6 +297,7 @@ class ProducerTest extends JUnitSuite {
     val props = new Properties()
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
     try {
@@ -326,6 +329,7 @@ class ProducerTest extends JUnitSuite {
     props.put("partitioner.class", "kafka.producer.NegativePartitioner")
     props.put("serializer.class", "kafka.producer.StringSerializer")
     props.put("producer.type", "async")
+    props.put("zk.connect", TestZKUtils.zookeeperConnect)
     val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
       new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
     try {