You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC

[30/30] git commit: KAFKA-544. Follow-up items on key-retention. Addresses misc. comments from Joel, see ticket for details.

KAFKA-544. Follow-up items on key-retention. Addresses misc. comments from Joel, see ticket for details. 


git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1413839 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e6522c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e6522c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e6522c7

Branch: refs/heads/trunk
Commit: 6e6522c7c96c15007a69eba24ea0db8ec16fb978
Parents: 602acaf
Author: Edward Jay Kreps <jk...@apache.org>
Authored: Mon Nov 26 20:59:21 2012 +0000
Committer: Edward Jay Kreps <jk...@apache.org>
Committed: Mon Nov 26 20:59:21 2012 +0000

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsoleConsumer.scala     |   23 ++++-
 .../scala/kafka/producer/ConsoleProducer.scala     |   68 ++++++++++++---
 .../kafka/producer/async/ProducerSendThread.scala  |    6 +-
 core/src/main/scala/kafka/serializer/Decoder.scala |    7 --
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    2 +-
 core/src/main/scala/kafka/utils/Utils.scala        |    2 +-
 6 files changed, 79 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 2a8e02d..39cd57d 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -89,7 +89,7 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("class")
             .ofType(classOf[String])
-            .defaultsTo(classOf[NewlineMessageFormatter].getName)
+            .defaultsTo(classOf[DefaultMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
             .withRequiredArg
             .describedAs("prop")
@@ -256,10 +256,27 @@ trait MessageFormatter {
   def close() {}
 }
 
-class NewlineMessageFormatter extends MessageFormatter {
+class DefaultMessageFormatter extends MessageFormatter {
+  var printKey = false
+  var keySeparator = "\t".getBytes
+  var lineSeparator = "\n".getBytes
+  
+  override def init(props: Properties) {
+    if(props.containsKey("print.key"))
+      printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
+    if(props.containsKey("key.separator"))
+      keySeparator = props.getProperty("key.separator").getBytes
+    if(props.containsKey("line.separator"))
+      lineSeparator = props.getProperty("line.separator").getBytes
+  }
+  
   def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+    if(printKey) {
+      output.write(key)
+      output.write(keySeparator)
+    }
     output.write(value)
-    output.write('\n')
+    output.write(lineSeparator)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8664cb1..e7f50e4 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -20,7 +20,9 @@ package kafka.producer
 import scala.collection.JavaConversions._
 import joptsimple._
 import java.util.Properties
+import java.util.regex._
 import java.io._
+import kafka.common._
 import kafka.message._
 import kafka.serializer._
 
@@ -49,13 +51,18 @@ object ConsoleProducer {
                                .describedAs("timeout_ms")
                                .ofType(classOf[java.lang.Long])
                                .defaultsTo(1000)
-    val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
+    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
+                                 .withRequiredArg
+                                 .describedAs("encoder_class")
+                                 .ofType(classOf[java.lang.String])
+                                 .defaultsTo(classOf[StringEncoder].getName)
+    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
                                  .withRequiredArg
                                  .describedAs("encoder_class")
                                  .ofType(classOf[java.lang.String])
                                  .defaultsTo(classOf[StringEncoder].getName)
     val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + 
-                                                          "By default each line is read as a seperate message.")
+                                                          "By default each line is read as a separate message.")
                                   .withRequiredArg
                                   .describedAs("reader_class")
                                   .ofType(classOf[java.lang.String])
@@ -82,9 +89,11 @@ object ConsoleProducer {
     val compress = options.has(compressOpt)
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
-    val encoderClass = options.valueOf(messageEncoderOpt)
+    val keyEncoderClass = options.valueOf(keyEncoderOpt)
+    val valueEncoderClass = options.valueOf(valueEncoderOpt)
     val readerClass = options.valueOf(messageReaderOpt)
     val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+    cmdLineProps.put("topic", topic)
 
     val props = new Properties()
     props.put("broker.list", brokerList)
@@ -94,12 +103,13 @@ object ConsoleProducer {
     if(options.has(batchSizeOpt))
       props.put("batch.size", batchSize.toString)
     props.put("queue.time", sendTimeout.toString)
-    props.put("serializer.class", encoderClass)
+    props.put("key.serializer.class", keyEncoderClass)
+    props.put("serializer.class", valueEncoderClass)
 
-    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
+    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
     reader.init(System.in, cmdLineProps)
 
-    val producer = new Producer[Any, Any](new ProducerConfig(props))
+    val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
 
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
@@ -107,11 +117,11 @@ object ConsoleProducer {
       }
     })
 
-    var message: AnyRef = null
+    var message: KeyedMessage[AnyRef, AnyRef] = null
     do { 
       message = reader.readMessage()
       if(message != null)
-        producer.send(new KeyedMessage(topic, message))
+        producer.send(message)
     } while(message != null)
   }
 
@@ -127,19 +137,49 @@ object ConsoleProducer {
     props
   }
 
-  trait MessageReader { 
+  trait MessageReader[K,V] { 
     def init(inputStream: InputStream, props: Properties) {}
-    def readMessage(): AnyRef
+    def readMessage(): KeyedMessage[K,V]
     def close() {}
   }
 
-  class LineMessageReader extends MessageReader { 
+  class LineMessageReader extends MessageReader[String, String] {
+    var topic: String = null
     var reader: BufferedReader = null
-
-    override def init(inputStream: InputStream, props: Properties) { 
+    var parseKey = false
+    var keySeparator = "\t"
+    var ignoreError = false
+    var lineNumber = 0
+
+    override def init(inputStream: InputStream, props: Properties) {
+      topic = props.getProperty("topic")
+      if(props.containsKey("parse.key"))
+        parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
+      if(props.containsKey("key.seperator"))
+        keySeparator = props.getProperty("key.separator")
+      if(props.containsKey("ignore.error"))
+        ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
       reader = new BufferedReader(new InputStreamReader(inputStream))
     }
 
-    override def readMessage() = reader.readLine()
+    override def readMessage() = {
+      lineNumber += 1
+      val line = reader.readLine()
+      if(parseKey) {
+        line.indexOf(keySeparator) match {
+          case -1 =>
+            if(ignoreError)
+              new KeyedMessage(topic, line)
+            else
+              throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+          case n =>
+            new KeyedMessage(topic,
+                             line.substring(0, n), 
+                             if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
+        }
+      } else {
+        new KeyedMessage(topic, line) 
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 46ea7d4..c900c45 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -19,7 +19,7 @@ package kafka.producer.async
 
 import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
-import collection.mutable.ListBuffer
+import collection.mutable.ArrayBuffer
 import kafka.producer.KeyedMessage
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
@@ -57,7 +57,7 @@ class ProducerSendThread[K,V](val threadName: String,
 
   private def processEvents() {
     var lastSend = SystemTime.milliseconds
-    var events = new ListBuffer[KeyedMessage[K,V]]
+    var events = new ArrayBuffer[KeyedMessage[K,V]]
     var full: Boolean = false
 
     // drain the queue until you get a shutdown command
@@ -85,7 +85,7 @@ class ProducerSendThread[K,V](val threadName: String,
           // if either queue time has reached or batch size has reached, dispatch to event handler
           tryToHandle(events)
           lastSend = SystemTime.milliseconds
-          events = new ListBuffer[KeyedMessage[K,V]]
+          events = new ArrayBuffer[KeyedMessage[K,V]]
         }
     }
     // send the last batch of events

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/serializer/Decoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 9da8370..6fc3b1d 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -37,13 +37,6 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B
 }
 
 /**
- * Decode messages without any key
- */
-class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
-  def fromBytes(bytes: Array[Byte]) = new Message(bytes)
-}
-
-/**
  * The string encoder translates strings into bytes. It uses UTF8 by default but takes
  * an optional property serializer.encoding to control this.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index dac7056..e34a432 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -74,7 +74,7 @@ object SimpleConsumerShell extends Logging {
                            .withRequiredArg
                            .describedAs("class")
                            .ofType(classOf[String])
-                           .defaultsTo(classOf[NewlineMessageFormatter].getName)
+                           .defaultsTo(classOf[DefaultMessageFormatter].getName)
     val messageFormatterArgOpt = parser.accepts("property")
                            .withRequiredArg
                            .describedAs("prop")

http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 42e3e18..c83752f 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -426,7 +426,7 @@ object Utils extends Logging {
   }
 
   /**
-   * This method gets comma seperated values which contains key,value pairs and returns a map of
+   * This method gets comma separated values which contains key,value pairs and returns a map of
    * key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
    */
   def parseCsvMap(str: String): Map[String, String] = {