You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/14 06:17:29 UTC

svn commit: r1372724 [2/2] - in /incubator/kafka/branches/0.8: contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/k...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Aug 14 04:17:27 2012
@@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configProps2)
     servers ++= List(server1, server2)
 
-    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -80,7 +80,7 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configProps2)
     servers ++= List(server1, server2)
 
-    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -150,7 +150,7 @@ class LogRecoveryTest extends JUnit3Suit
     hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
     hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
 
-    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -194,7 +194,7 @@ class LogRecoveryTest extends JUnit3Suit
     hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
     hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
 
-    val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+    val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
     producer = new Producer[Int, Message](new ProducerConfig(producerProps))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Tue Aug 14 04:17:27 2012
@@ -55,7 +55,7 @@ class ReplicaFetchTest extends JUnit3Sui
     }
 
     // send test messages to leader
-    val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+    val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder)
     producer.send(new ProducerData[String, String](topic1, testMessageList1),
                   new ProducerData[String, String](topic2, testMessageList2))
     producer.close()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Aug 14 04:17:27 2012
@@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3S
       // create topic
       CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
 
-      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
 
       // send some messages
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@@ -63,7 +63,7 @@ class ServerShutdownTest extends JUnit3S
 
 
     {
-      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
+      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
       val consumer = new SimpleConsumer(host,
                                         port,
                                         1000000,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Aug 14 04:17:27 2012
@@ -33,10 +33,10 @@ import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
-import kafka.serializer.{DefaultEncoder, Encoder}
 import kafka.common.ErrorMapping
 import kafka.api._
 import collection.mutable.{Map, Set}
+import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 
 
 /**
@@ -114,12 +114,17 @@ object TestUtils extends Logging {
     yield createBrokerConfig(node, port)
   }
 
+  def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
+    configs.map(c => c.hostName + ":" + c.port).mkString(",")
+  }
+
   /**
    * Create a test config for the given node id
    */
   def createBrokerConfig(nodeId: Int, port: Int): Properties = {
     val props = new Properties
     props.put("brokerid", nodeId.toString)
+    props.put("hostname", "localhost")
     props.put("port", port.toString)
     props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
     props.put("log.flush.interval", "1")
@@ -283,9 +288,9 @@ object TestUtils extends Logging {
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
+  def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
     val props = new Properties()
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", brokerList)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
@@ -293,11 +298,11 @@ object TestUtils extends Logging {
     new Producer[K, V](new ProducerConfig(props))
   }
 
-  def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
+  def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int,
                         reconnectInterval: Int): Properties = {
     val props = new Properties()
     props.put("producer.type", "sync")
-    props.put("zk.connect", zkConnect)
+    props.put("broker.list", brokerList)
     props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
     props.put("buffer.size", bufferSize.toString)
     props.put("connect.timeout.ms", connectTimeout.toString)
@@ -348,6 +353,11 @@ object TestUtils extends Logging {
     produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
   }
 
+  def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
+    val encoder = new StringEncoder
+    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
+  }
+
   def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
     produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
   }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java Tue Aug 14 04:17:27 2012
@@ -30,7 +30,7 @@ public class Producer extends Thread
   public Producer(String topic)
   {
     props.put("serializer.class", "kafka.serializer.StringEncoder");
-    props.put("zk.connect", "localhost:2181");
+    props.put("broker.list", "localhost:9092");
     // Use random partitioner. Don't need the key type. Just set it to Integer.
     // The message is of type String.
     producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Aug 14 04:17:27 2012
@@ -71,9 +71,9 @@ object ProducerPerformance extends Loggi
   }
 
   class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
-    val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
       .withRequiredArg
-      .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+      .describedAs("hostname:port")
       .ofType(classOf[String])
     val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
       .withRequiredArg()
@@ -115,7 +115,7 @@ object ProducerPerformance extends Loggi
       .defaultsTo(0)
 
     val options = parser.parse(args : _*)
-    for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
+    for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -128,7 +128,7 @@ object ProducerPerformance extends Loggi
     val showDetailedStats = options.has(showDetailedStatsOpt)
     val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
     val hideHeader = options.has(hideHeaderOpt)
-    val brokerInfo = options.valueOf(brokerInfoOpt)
+    val brokerList = options.valueOf(brokerListOpt)
     val messageSize = options.valueOf(messageSizeOpt).intValue
     var isFixSize = !options.has(varyMessageSizeOpt)
     var isAsync = options.has(asyncOpt)
@@ -170,13 +170,7 @@ object ProducerPerformance extends Loggi
                        val allDone: CountDownLatch,
                        val rand: Random) extends Runnable {
     val props = new Properties()
-    val brokerInfoList = config.brokerInfo.split("=")
-    if (brokerInfoList(0) == "zk.connect") {
-      props.put("zk.connect", brokerInfoList(1))
-      props.put("zk.sessiontimeout.ms", "300000")
-    }
-    else
-      props.put("broker.list", brokerInfoList(1))
+    props.put("broker.list", config.brokerList)
     props.put("compression.codec", config.compressionCodec.codec.toString)
     props.put("reconnect.interval", Integer.MAX_VALUE.toString)
     props.put("buffer.size", (64*1024).toString)

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Aug 14 04:17:27 2012
@@ -231,14 +231,14 @@ start_servers_cluster() {
 
 start_producer_perf() {
     this_topic=$1
-    zk_conn_str=$2
+    broker_list_str=$2
     no_msg_to_produce=$3
     init_msg_id=$4
 
     info "starting producer performance"
 
     ${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
-        --brokerinfo "zk.connect=${zk_conn_str}" \
+        --broker-list ${broker_list_str} \
         --topic ${this_topic} \
         --messages $no_msg_to_produce \
         --message-size 100 \
@@ -501,7 +501,7 @@ start_test() {
         fi
 
         init_id=$(( ($i - 1) * $producer_msg_batch_size ))
-        start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
+        start_producer_perf $test_topic localhost:9091,localhost:9092,localhost:9093 $producer_msg_batch_size $init_id
         info "sleeping for 15s"
         sleep 15
         echo

Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties Tue Aug 14 04:17:27 2012
@@ -16,20 +16,9 @@
 
 ############################# Producer Basics #############################
 
-# need to set either broker.list or zk.connect
-
 # configure brokers statically
 # format: brokerid1:host1:port1,brokerid2:host2:port2 ...
-#broker.list=0:localhost:9092
-
-# discover brokers from ZK
-zk.connect=localhost:2181
-
-# zookeeper session timeout; default is 6000
-#zk.sessiontimeout.ms=
-
-# the max time that the client waits to establish a connection to zookeeper; default is 6000
-#zk.connectiontimeout.ms
+broker.list=localhost:9091,localhost:9092,localhost:9093
 
 # name of the partitioner class for partitioning events; default partition spreads data randomly
 #partitioner.class=