You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/15 23:15:26 UTC
svn commit: r1410055 [3/3] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/
core/src/main/scala/kafka/consumer/
core/src/main/scala/kafka/javaapi/consum...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Nov 15 22:15:14 2012
@@ -24,6 +24,7 @@ import kafka.message.{Message, ByteBuffe
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.producer._
+import kafka.utils.IntEncoder
import kafka.utils.TestUtils._
import kafka.admin.CreateTopicCommand
import kafka.api.FetchRequestBuilder
@@ -36,19 +37,21 @@ class ServerShutdownTest extends JUnit3S
val host = "localhost"
val topic = "test"
- val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
- val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+ val sent1 = List("hello", "there")
+ val sent2 = List("more", "messages")
@Test
def testCleanShutdown() {
var server = new KafkaServer(config)
server.startup()
- var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+ val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)
+ producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
+ var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
// create topic
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
// send some messages
- producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+ producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
// do a clean shutdown and check that the clean shudown file is written out
server.shutdown()
@@ -62,7 +65,7 @@ class ServerShutdownTest extends JUnit3S
server = new KafkaServer(config)
server.startup()
- producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+ producer = new Producer[Int, String](new ProducerConfig(producerConfig))
val consumer = new SimpleConsumer(host,
port,
1000000,
@@ -75,18 +78,18 @@ class ServerShutdownTest extends JUnit3S
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
- TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
+ assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload)))
val newOffset = fetchedMessage.last.nextOffset
// send some more messages
- producer.send(new ProducerData[Int, Message](topic, 0, sent2))
+ producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
fetchedMessage = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
fetchedMessage = fetched.messageSet(topic, 0)
}
- TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
+ assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload)))
consumer.close()
producer.close()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Nov 15 22:15:14 2012
@@ -288,13 +288,16 @@ object TestUtils extends Logging {
/**
* Create a producer for the given host and port
*/
- def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
+ def createProducer[K, V](brokerList: String,
+ encoder: Encoder[V] = new DefaultEncoder(),
+ keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = {
val props = new Properties()
props.put("broker.list", brokerList)
props.put("buffer.size", "65536")
props.put("connect.timeout.ms", "100000")
props.put("reconnect.interval", "10000")
props.put("serializer.class", encoder.getClass.getCanonicalName)
+ props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
new Producer[K, V](new ProducerConfig(props))
}
@@ -307,6 +310,8 @@ object TestUtils extends Logging {
props.put("buffer.size", bufferSize.toString)
props.put("connect.timeout.ms", connectTimeout.toString)
props.put("reconnect.interval", reconnectInterval.toString)
+ props.put("producer.request.timeout.ms", 30000.toString)
+ props.put("serializer.class", classOf[StringEncoder].getName.toString)
props
}
@@ -353,11 +358,6 @@ object TestUtils extends Logging {
produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
}
- def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
- val encoder = new StringEncoder
- new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
- }
-
def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
}
@@ -490,30 +490,22 @@ object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}
-class StringSerializer extends Encoder[String] {
- def toEvent(message: Message):String = message.toString
- def toMessage(event: String):Message = new Message(event.getBytes)
- def getTopic(event: String): String = event.concat("-topic")
-}
-
-class NegativePartitioner extends Partitioner[String] {
- def partition(data: String, numPartitions: Int): Int = {
- -1
- }
+class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
+ override def toBytes(n: Int) = n.toString.getBytes
}
-class StaticPartitioner extends Partitioner[String] {
+class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.length % numPartitions)
}
}
-class HashPartitioner extends Partitioner[String] {
+class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
def partition(data: String, numPartitions: Int): Int = {
(data.hashCode % numPartitions)
}
}
-class FixedValuePartitioner extends Partitioner[Int] {
+class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] {
def partition(data: Int, numPartitions: Int): Int = data
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Thu Nov 15 22:15:14 2012
@@ -17,6 +17,8 @@
package kafka.utils
+import java.util.Arrays
+import java.nio.ByteBuffer
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -51,5 +53,13 @@ class UtilsTest extends JUnitSuite {
assertEquals(2, its.next())
assertEquals(1, its.next())
}
+
+ @Test
+ def testReadBytes() {
+ for(testCase <- List("", "a", "abcd")) {
+ val bytes = testCase.getBytes
+ assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
+ }
+ }
}
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java Thu Nov 15 22:15:14 2012
@@ -56,10 +56,10 @@ public class Consumer extends Thread
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
- Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- KafkaStream<Message> stream = consumerMap.get(topic).get(0);
- ConsumerIterator<Message> it = stream.iterator();
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
+ ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext())
- System.out.println(ExampleUtils.getMessage(it.next().message()));
+ System.out.println(new String(it.next().message()));
}
}
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java Thu Nov 15 22:15:14 2012
@@ -18,7 +18,7 @@ package kafka.examples;
import java.util.Properties;
-import kafka.javaapi.producer.ProducerData;
+import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Producer extends Thread
@@ -42,7 +42,7 @@ public class Producer extends Thread
while(true)
{
String messageStr = new String("Message_" + messageNo);
- producer.send(new ProducerData<Integer, String>(topic, messageStr));
+ producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
}
}
Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Thu Nov 15 22:15:14 2012
@@ -19,6 +19,9 @@ package kafka.examples;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import kafka.javaapi.consumer.SimpleConsumer;
@@ -29,9 +32,12 @@ import java.util.Map;
public class SimpleConsumerDemo {
- private static void printMessages(ByteBufferMessageSet messageSet) {
- for (MessageAndOffset messageAndOffset : messageSet) {
- System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
+ private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
+ for(MessageAndOffset messageAndOffset: messageSet) {
+ ByteBuffer payload = messageAndOffset.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ System.out.println(new String(bytes, "UTF-8"));
}
}
@@ -47,7 +53,7 @@ public class SimpleConsumerDemo {
}
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
generateData();
SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Thu Nov 15 22:15:14 2012
@@ -52,7 +52,7 @@ object ConsumerPerformance {
val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
- val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads))
+ val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
for ((topic, streamList) <- topicMessageStreams)
for (i <- 0 until streamList.length)
@@ -140,7 +140,7 @@ object ConsumerPerformance {
val hideHeader = options.has(hideHeaderOpt)
}
- class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
+ class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
extends Thread(name) {
private val shutdownLatch = new CountDownLatch(1)
@@ -160,7 +160,7 @@ object ConsumerPerformance {
try {
for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
messagesRead += 1
- bytesRead += messageAndMetadata.message.payloadSize
+ bytesRead += messageAndMetadata.message.length
if (messagesRead % config.reportingInterval == 0) {
if(config.showDetailedStats)
Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Thu Nov 15 22:15:14 2012
@@ -23,6 +23,7 @@ import kafka.producer._
import org.apache.log4j.Logger
import kafka.message.{CompressionCodec, Message}
import java.text.SimpleDateFormat
+import kafka.serializer._
import java.util._
import collection.immutable.List
import kafka.utils.{VerifiableProperties, Logging}
@@ -201,9 +202,12 @@ object ProducerPerformance extends Loggi
props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("producer.num.retries", config.producerNumRetries.toString)
props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString)
+ props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
+ props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
+
val producerConfig = new ProducerConfig(props)
- val producer = new Producer[Message, Message](producerConfig)
+ val producer = new Producer[Long, Array[Byte]](producerConfig)
val seqIdNumDigit = 10 // no. of digits for max int value
val messagesPerThread = config.numMessages / config.numThreads
@@ -215,7 +219,7 @@ object ProducerPerformance extends Loggi
private val threadIdLabel = "ThreadID"
private val topicLabel = "Topic"
private var leftPaddedSeqId : String = ""
- private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message = {
+ private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
// Eg. 1000 msg in 10 threads => 100 msg per thread
// thread 0 IDs : 0 ~ 99
@@ -233,19 +237,18 @@ object ProducerPerformance extends Loggi
val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
debug(seqMsgString)
- return new Message(seqMsgString.getBytes())
+ return seqMsgString.getBytes()
}
- private def generateProducerData(topic: String, messageId: Long): (ProducerData[Message, Message], Int) = {
+ private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
val message = if(config.seqIdMode) {
val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
generateMessageWithSeqId(topic, seqId, msgSize)
+ } else {
+ new Array[Byte](msgSize)
}
- else {
- new Message(new Array[Byte](msgSize))
- }
- (new ProducerData[Message, Message](topic, null, message), message.payloadSize)
+ (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
}
override def run {