You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/15 23:15:26 UTC

svn commit: r1410055 [3/3] - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/impl/ contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/consum...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Thu Nov 15 22:15:14 2012
@@ -24,6 +24,7 @@ import kafka.message.{Message, ByteBuffe
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.producer._
+import kafka.utils.IntEncoder
 import kafka.utils.TestUtils._
 import kafka.admin.CreateTopicCommand
 import kafka.api.FetchRequestBuilder
@@ -36,19 +37,21 @@ class ServerShutdownTest extends JUnit3S
 
   val host = "localhost"
   val topic = "test"
-  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
-  val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
+  val sent1 = List("hello", "there")
+  val sent2 = List("more", "messages")
 
   @Test
   def testCleanShutdown() {
     var server = new KafkaServer(config)
     server.startup()
-    var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+    val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)
+    producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString)
+    var producer = new Producer[Int, String](new ProducerConfig(producerConfig))
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
     // send some messages
-    producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+    producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
 
     // do a clean shutdown and check that the clean shudown file is written out
     server.shutdown()
@@ -62,7 +65,7 @@ class ServerShutdownTest extends JUnit3S
     server = new KafkaServer(config)
     server.startup()
 
-    producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+    producer = new Producer[Int, String](new ProducerConfig(producerConfig))
     val consumer = new SimpleConsumer(host,
                                       port,
                                       1000000,
@@ -75,18 +78,18 @@ class ServerShutdownTest extends JUnit3S
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
+    assertEquals(sent1, fetchedMessage.map(m => Utils.readString(m.message.payload)))
     val newOffset = fetchedMessage.last.nextOffset
 
     // send some more messages
-    producer.send(new ProducerData[Int, Message](topic, 0, sent2))
+    producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*)
 
     fetchedMessage = null
     while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
       val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
       fetchedMessage = fetched.messageSet(topic, 0)
     }
-    TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
+    assertEquals(sent2, fetchedMessage.map(m => Utils.readString(m.message.payload)))
 
     consumer.close()
     producer.close()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Nov 15 22:15:14 2012
@@ -288,13 +288,16 @@ object TestUtils extends Logging {
   /**
    * Create a producer for the given host and port
    */
-  def createProducer[K, V](brokerList: String, encoder: Encoder[V] = new DefaultEncoder): Producer[K, V] = {
+  def createProducer[K, V](brokerList: String, 
+                           encoder: Encoder[V] = new DefaultEncoder(), 
+                           keyEncoder: Encoder[K] = new DefaultEncoder()): Producer[K, V] = {
     val props = new Properties()
     props.put("broker.list", brokerList)
     props.put("buffer.size", "65536")
     props.put("connect.timeout.ms", "100000")
     props.put("reconnect.interval", "10000")
     props.put("serializer.class", encoder.getClass.getCanonicalName)
+    props.put("key.serializer.class", keyEncoder.getClass.getCanonicalName)
     new Producer[K, V](new ProducerConfig(props))
   }
 
@@ -307,6 +310,8 @@ object TestUtils extends Logging {
     props.put("buffer.size", bufferSize.toString)
     props.put("connect.timeout.ms", connectTimeout.toString)
     props.put("reconnect.interval", reconnectInterval.toString)
+    props.put("producer.request.timeout.ms", 30000.toString)
+    props.put("serializer.class", classOf[StringEncoder].getName.toString)
     props
   }
 
@@ -353,11 +358,6 @@ object TestUtils extends Logging {
     produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
   }
 
-  def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
-    val encoder = new StringEncoder
-    new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
-  }
-
   def produceRequest(correlationId: Int, topic: String, partition: Int, message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
     produceRequestWithAcks(List(topic), List(partition), message, SyncProducerConfig.DefaultRequiredAcks)
   }
@@ -490,30 +490,22 @@ object TestZKUtils {
   val zookeeperConnect = "127.0.0.1:2182"
 }
 
-class StringSerializer extends Encoder[String] {
-  def toEvent(message: Message):String = message.toString
-  def toMessage(event: String):Message = new Message(event.getBytes)
-  def getTopic(event: String): String = event.concat("-topic")
-}
-
-class NegativePartitioner extends Partitioner[String] {
-  def partition(data: String, numPartitions: Int): Int = {
-    -1
-  }
+class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
+  override def toBytes(n: Int) = n.toString.getBytes
 }
 
-class StaticPartitioner extends Partitioner[String] {
+class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
   def partition(data: String, numPartitions: Int): Int = {
     (data.length % numPartitions)
   }
 }
 
-class HashPartitioner extends Partitioner[String] {
+class HashPartitioner(props: VerifiableProperties = null) extends Partitioner[String] {
   def partition(data: String, numPartitions: Int): Int = {
     (data.hashCode % numPartitions)
   }
 }
 
-class FixedValuePartitioner extends Partitioner[Int] {
+class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner[Int] {
   def partition(data: Int, numPartitions: Int): Int = data
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Thu Nov 15 22:15:14 2012
@@ -17,6 +17,8 @@
 
 package kafka.utils
 
+import java.util.Arrays
+import java.nio.ByteBuffer
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
@@ -51,5 +53,13 @@ class UtilsTest extends JUnitSuite {
     assertEquals(2, its.next())
     assertEquals(1, its.next())
   }
+  
+  @Test
+  def testReadBytes() {
+    for(testCase <- List("", "a", "abcd")) {
+      val bytes = testCase.getBytes
+      assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
+    }
+  }
 
 }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Consumer.java Thu Nov 15 22:15:14 2012
@@ -56,10 +56,10 @@ public class Consumer extends Thread
   public void run() {
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
     topicCountMap.put(topic, new Integer(1));
-    Map<String, List<KafkaStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-    KafkaStream<Message> stream =  consumerMap.get(topic).get(0);
-    ConsumerIterator<Message> it = stream.iterator();
+    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
+    ConsumerIterator<byte[], byte[]> it = stream.iterator();
     while(it.hasNext())
-      System.out.println(ExampleUtils.getMessage(it.next().message()));
+      System.out.println(new String(it.next().message()));
   }
 }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/Producer.java Thu Nov 15 22:15:14 2012
@@ -18,7 +18,7 @@ package kafka.examples;
 
 
 import java.util.Properties;
-import kafka.javaapi.producer.ProducerData;
+import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 
 public class Producer extends Thread
@@ -42,7 +42,7 @@ public class Producer extends Thread
     while(true)
     {
       String messageStr = new String("Message_" + messageNo);
-      producer.send(new ProducerData<Integer, String>(topic, messageStr));
+      producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
       messageNo++;
     }
   }

Modified: incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java (original)
+++ incubator/kafka/branches/0.8/examples/src/main/java/kafka/examples/SimpleConsumerDemo.java Thu Nov 15 22:15:14 2012
@@ -19,6 +19,9 @@ package kafka.examples;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.javaapi.FetchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -29,9 +32,12 @@ import java.util.Map;
 
 public class SimpleConsumerDemo {
     
-  private static void printMessages(ByteBufferMessageSet messageSet) {
-    for (MessageAndOffset messageAndOffset : messageSet) {
-      System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
+  private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
+    for(MessageAndOffset messageAndOffset: messageSet) {
+      ByteBuffer payload = messageAndOffset.message().payload();
+      byte[] bytes = new byte[payload.limit()];
+      payload.get(bytes);
+      System.out.println(new String(bytes, "UTF-8"));
     }
   }
 
@@ -47,7 +53,7 @@ public class SimpleConsumerDemo {
     }
   }
   
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     generateData();
       
     SimpleConsumer simpleConsumer = new SimpleConsumer(KafkaProperties.kafkaServerURL,

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala Thu Nov 15 22:15:14 2012
@@ -52,7 +52,7 @@ object ConsumerPerformance {
 
     val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
 
-    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads))
+    val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
     var threadList = List[ConsumerPerfThread]()
     for ((topic, streamList) <- topicMessageStreams)
       for (i <- 0 until streamList.length)
@@ -140,7 +140,7 @@ object ConsumerPerformance {
     val hideHeader = options.has(hideHeaderOpt)
   }
 
-  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
+  class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
                            config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
     extends Thread(name) {
     private val shutdownLatch = new CountDownLatch(1)
@@ -160,7 +160,7 @@ object ConsumerPerformance {
       try {
         for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
           messagesRead += 1
-          bytesRead += messageAndMetadata.message.payloadSize
+          bytesRead += messageAndMetadata.message.length
 
           if (messagesRead % config.reportingInterval == 0) {
             if(config.showDetailedStats)

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala?rev=1410055&r1=1410054&r2=1410055&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ProducerPerformance.scala Thu Nov 15 22:15:14 2012
@@ -23,6 +23,7 @@ import kafka.producer._
 import org.apache.log4j.Logger
 import kafka.message.{CompressionCodec, Message}
 import java.text.SimpleDateFormat
+import kafka.serializer._
 import java.util._
 import collection.immutable.List
 import kafka.utils.{VerifiableProperties, Logging}
@@ -201,9 +202,12 @@ object ProducerPerformance extends Loggi
     props.put("producer.request.timeout.ms", config.producerRequestTimeoutMs.toString)
     props.put("producer.num.retries", config.producerNumRetries.toString)
     props.put("producer.retry.backoff.ms", config.producerRetryBackoffMs.toString)
+    props.put("serializer.class", classOf[DefaultEncoder].getName.toString)
+    props.put("key.serializer.class", classOf[NullEncoder[Long]].getName.toString)
 
+    
     val producerConfig = new ProducerConfig(props)
-    val producer = new Producer[Message, Message](producerConfig)
+    val producer = new Producer[Long, Array[Byte]](producerConfig)
     val seqIdNumDigit = 10   // no. of digits for max int value
 
     val messagesPerThread = config.numMessages / config.numThreads
@@ -215,7 +219,7 @@ object ProducerPerformance extends Loggi
     private val threadIdLabel  = "ThreadID"
     private val topicLabel     = "Topic"
     private var leftPaddedSeqId : String = ""
-    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Message = {
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
       // Each thread gets a unique range of sequential no. for its ids.
       // Eg. 1000 msg in 10 threads => 100 msg per thread
       // thread 0 IDs :   0 ~  99
@@ -233,19 +237,18 @@ object ProducerPerformance extends Loggi
 
       val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
       debug(seqMsgString)
-      return new Message(seqMsgString.getBytes())
+      return seqMsgString.getBytes()
     }
 
-    private def generateProducerData(topic: String, messageId: Long): (ProducerData[Message, Message], Int) = {
+    private def generateProducerData(topic: String, messageId: Long): (KeyedMessage[Long, Array[Byte]], Int) = {
       val msgSize = if(config.isFixSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
       val message = if(config.seqIdMode) {
         val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
         generateMessageWithSeqId(topic, seqId, msgSize)
+      } else {
+        new Array[Byte](msgSize)
       }
-      else {
-        new Message(new Array[Byte](msgSize))
-      }
-      (new ProducerData[Message, Message](topic, null, message), message.payloadSize)
+      (new KeyedMessage[Long, Array[Byte]](topic, messageId, message), message.length)
     }
 
     override def run {