You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/14 06:17:29 UTC
svn commit: r1372724 [2/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/producer/async/ core/src/main/scala/k...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Aug 14 04:17:27 2012
@@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suit
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
- val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -80,7 +80,7 @@ class LogRecoveryTest extends JUnit3Suit
server2 = TestUtils.createServer(configProps2)
servers ++= List(server1, server2)
- val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -150,7 +150,7 @@ class LogRecoveryTest extends JUnit3Suit
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
- val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
@@ -194,7 +194,7 @@ class LogRecoveryTest extends JUnit3Suit
hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
- val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
+ val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
producerProps.put("producer.request.timeout.ms", "1000")
producerProps.put("producer.request.required.acks", "-1")
producer = new Producer[Int, Message](new ProducerConfig(producerProps))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Tue Aug 14 04:17:27 2012
@@ -55,7 +55,7 @@ class ReplicaFetchTest extends JUnit3Sui
}
// send test messages to leader
- val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)
+ val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromConfigs(configs), new StringEncoder)
producer.send(new ProducerData[String, String](topic1, testMessageList1),
new ProducerData[String, String](topic2, testMessageList2))
producer.close()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Aug 14 04:17:27 2012
@@ -49,7 +49,7 @@ class ServerShutdownTest extends JUnit3S
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
- val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
+ val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
// send some messages
producer.send(new ProducerData[Int, Message](topic, 0, sent1))
@@ -63,7 +63,7 @@ class ServerShutdownTest extends JUnit3S
{
- val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(zkConnect, 64*1024, 100000, 10000)))
+ val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
val consumer = new SimpleConsumer(host,
port,
1000000,
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Aug 14 04:17:27 2012
@@ -33,10 +33,10 @@ import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
-import kafka.serializer.{DefaultEncoder, Encoder}
import kafka.common.ErrorMapping
import kafka.api._
import collection.mutable.{Map, Set}
+import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
/**
@@ -114,12 +114,17 @@ object TestUtils extends Logging {
yield createBrokerConfig(node, port)
}
+ def getBrokerListStrFromConfigs(configs: Seq[KafkaConfig]): String = {
+ configs.map(c => c.hostName + ":" + c.port).mkString(",")
+ }
+
/**
* Create a test config for the given node id
*/
def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
+ props.put("hostname", "localhost")
props.put("port", port.toString)
props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
props.put("log.flush.interval", "1")
@@ -283,9 +288,9 @@ object TestUtils extends Logging {
/**
* Create a producer for the given host and port
*/
- def createProducer[K, V](zkConnect: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
+ def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
val props = new Properties()
- props.put("zk.connect", zkConnect)
+ props.put("broker.list", brokerList)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
@@ -293,11 +298,11 @@ object TestUtils extends Logging {
new Producer[K, V](new ProducerConfig(props))
}
- def getProducerConfig(zkConnect: String, bufferSize: Int, connectTimeout: Int,
+ def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int,
reconnectInterval: Int): Properties = {
val props = new Properties()
props.put("producer.type", "sync")
- props.put("zk.connect", zkConnect)
+ props.put("broker.list", brokerList)
props.put("partitioner.class", "kafka.utils.FixedValuePartitioner")
props.put("buffer.size", bufferSize.toString)
props.put("connect.timeout.ms", connectTimeout.toString)
@@ -348,6 +353,11 @@ object TestUtils extends Logging {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
}
+ def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
+ val encoder = new StringEncoder
+ new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
+ }
+
def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
}
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java Tue Aug 14 04:17:27 2012
@@ -30,7 +30,7 @@ public class Producer extends Thread
public Producer(String topic)
{
props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("zk.connect", "localhost:2181");
+ props.put("broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Tue Aug 14 04:17:27 2012
@@ -71,9 +71,9 @@ object ProducerPerformance extends Loggi
}
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
- val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
+ val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
.withRequiredArg
- .describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
+ .describedAs("hostname:port")
.ofType(classOf[String])
val produceRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
@@ -115,7 +115,7 @@ object ProducerPerformance extends Loggi
.defaultsTo(0)
val options = parser.parse(args : _*)
- for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
+ for(arg <- List(topicOpt, brokerListOpt, numMessagesOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
@@ -128,7 +128,7 @@ object ProducerPerformance extends Loggi
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
- val brokerInfo = options.valueOf(brokerInfoOpt)
+ val brokerList = options.valueOf(brokerListOpt)
val messageSize = options.valueOf(messageSizeOpt).intValue
var isFixSize = !options.has(varyMessageSizeOpt)
var isAsync = options.has(asyncOpt)
@@ -170,13 +170,7 @@ object ProducerPerformance extends Loggi
val allDone: CountDownLatch,
val rand: Random) extends Runnable {
val props = new Properties()
- val brokerInfoList = config.brokerInfo.split("=")
- if (brokerInfoList(0) == "zk.connect") {
- props.put("zk.connect", brokerInfoList(1))
- props.put("zk.sessiontimeout.ms", "300000")
- }
- else
- props.put("broker.list", brokerInfoList(1))
+ props.put("broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/bin/run-test.sh Tue Aug 14 04:17:27 2012
@@ -231,14 +231,14 @@ start_servers_cluster() {
start_producer_perf() {
this_topic=$1
- zk_conn_str=$2
+ broker_list_str=$2
no_msg_to_produce=$3
init_msg_id=$4
info "starting producer performance"
${base_dir}/bin/kafka-run-class.sh kafka.perf.ProducerPerformance \
- --brokerinfo "zk.connect=${zk_conn_str}" \
+ --broker-list ${broker_list_str} \
--topic ${this_topic} \
--messages $no_msg_to_produce \
--message-size 100 \
@@ -501,7 +501,7 @@ start_test() {
fi
init_id=$(( ($i - 1) * $producer_msg_batch_size ))
- start_producer_perf $test_topic localhost:$zk_port $producer_msg_batch_size $init_id
+ start_producer_perf $test_topic localhost:9091,localhost:9092,localhost:9093 $producer_msg_batch_size $init_id
info "sleeping for 15s"
sleep 15
echo
Modified: incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties?rev=1372724&r1=1372723&r2=1372724&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties (original)
+++ incubator/kafka/branches/0.8/system_test/single_host_multi_brokers/config/producer.properties Tue Aug 14 04:17:27 2012
@@ -16,20 +16,9 @@
############################# Producer Basics #############################
-# need to set either broker.list or zk.connect
-
# configure brokers statically
# format: brokerid1:host1:port1,brokerid2:host2:port2 ...
-#broker.list=0:localhost:9092
-
-# discover brokers from ZK
-zk.connect=localhost:2181
-
-# zookeeper session timeout; default is 6000
-#zk.sessiontimeout.ms=
-
-# the max time that the client waits to establish a connection to zookeeper; default is 6000
-#zk.connectiontimeout.ms
+broker.list=localhost:9091,localhost:9092,localhost:9093
# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=