You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/18 18:44:12 UTC
[30/30] git commit: KAFKA-544. Follow-up items on key-retention.
Addresses misc. comments from Joel, see ticket for details.
KAFKA-544. Follow-up items on key-retention. Addresses misc. comments from Joel, see ticket for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1413839 13f79535-47bb-0310-9956-ffa450edef68
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e6522c7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e6522c7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e6522c7
Branch: refs/heads/trunk
Commit: 6e6522c7c96c15007a69eba24ea0db8ec16fb978
Parents: 602acaf
Author: Edward Jay Kreps <jk...@apache.org>
Authored: Mon Nov 26 20:59:21 2012 +0000
Committer: Edward Jay Kreps <jk...@apache.org>
Committed: Mon Nov 26 20:59:21 2012 +0000
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsoleConsumer.scala | 23 ++++-
.../scala/kafka/producer/ConsoleProducer.scala | 68 ++++++++++++---
.../kafka/producer/async/ProducerSendThread.scala | 6 +-
core/src/main/scala/kafka/serializer/Decoder.scala | 7 --
.../scala/kafka/tools/SimpleConsumerShell.scala | 2 +-
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
6 files changed, 79 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 2a8e02d..39cd57d 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -89,7 +89,7 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
- .defaultsTo(classOf[NewlineMessageFormatter].getName)
+ .defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property")
.withRequiredArg
.describedAs("prop")
@@ -256,10 +256,27 @@ trait MessageFormatter {
def close() {}
}
-class NewlineMessageFormatter extends MessageFormatter {
+class DefaultMessageFormatter extends MessageFormatter {
+ var printKey = false
+ var keySeparator = "\t".getBytes
+ var lineSeparator = "\n".getBytes
+
+ override def init(props: Properties) {
+ if(props.containsKey("print.key"))
+ printKey = props.getProperty("print.key").trim.toLowerCase.equals("true")
+ if(props.containsKey("key.separator"))
+ keySeparator = props.getProperty("key.separator").getBytes
+ if(props.containsKey("line.separator"))
+ lineSeparator = props.getProperty("line.separator").getBytes
+ }
+
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+ if(printKey) {
+ output.write(key)
+ output.write(keySeparator)
+ }
output.write(value)
- output.write('\n')
+ output.write(lineSeparator)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 8664cb1..e7f50e4 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -20,7 +20,9 @@ package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
+import java.util.regex._
import java.io._
+import kafka.common._
import kafka.message._
import kafka.serializer._
@@ -49,13 +51,18 @@ object ConsoleProducer {
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(1000)
- val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
+ val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
+ .withRequiredArg
+ .describedAs("encoder_class")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo(classOf[StringEncoder].getName)
+ val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
- "By default each line is read as a seperate message.")
+ "By default each line is read as a separate message.")
.withRequiredArg
.describedAs("reader_class")
.ofType(classOf[java.lang.String])
@@ -82,9 +89,11 @@ object ConsoleProducer {
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
- val encoderClass = options.valueOf(messageEncoderOpt)
+ val keyEncoderClass = options.valueOf(keyEncoderOpt)
+ val valueEncoderClass = options.valueOf(valueEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+ cmdLineProps.put("topic", topic)
val props = new Properties()
props.put("broker.list", brokerList)
@@ -94,12 +103,13 @@ object ConsoleProducer {
if(options.has(batchSizeOpt))
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
- props.put("serializer.class", encoderClass)
+ props.put("key.serializer.class", keyEncoderClass)
+ props.put("serializer.class", valueEncoderClass)
- val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
+ val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
reader.init(System.in, cmdLineProps)
- val producer = new Producer[Any, Any](new ProducerConfig(props))
+ val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
@@ -107,11 +117,11 @@ object ConsoleProducer {
}
})
- var message: AnyRef = null
+ var message: KeyedMessage[AnyRef, AnyRef] = null
do {
message = reader.readMessage()
if(message != null)
- producer.send(new KeyedMessage(topic, message))
+ producer.send(message)
} while(message != null)
}
@@ -127,19 +137,49 @@ object ConsoleProducer {
props
}
- trait MessageReader {
+ trait MessageReader[K,V] {
def init(inputStream: InputStream, props: Properties) {}
- def readMessage(): AnyRef
+ def readMessage(): KeyedMessage[K,V]
def close() {}
}
- class LineMessageReader extends MessageReader {
+ class LineMessageReader extends MessageReader[String, String] {
+ var topic: String = null
var reader: BufferedReader = null
-
- override def init(inputStream: InputStream, props: Properties) {
+ var parseKey = false
+ var keySeparator = "\t"
+ var ignoreError = false
+ var lineNumber = 0
+
+ override def init(inputStream: InputStream, props: Properties) {
+ topic = props.getProperty("topic")
+ if(props.containsKey("parse.key"))
+ parseKey = props.getProperty("parse.key").trim.toLowerCase.equals("true")
+ if(props.containsKey("key.seperator"))
+ keySeparator = props.getProperty("key.separator")
+ if(props.containsKey("ignore.error"))
+ ignoreError = props.getProperty("ignore.error").trim.toLowerCase.equals("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}
- override def readMessage() = reader.readLine()
+ override def readMessage() = {
+ lineNumber += 1
+ val line = reader.readLine()
+ if(parseKey) {
+ line.indexOf(keySeparator) match {
+ case -1 =>
+ if(ignoreError)
+ new KeyedMessage(topic, line)
+ else
+ throw new KafkaException("No key found on line " + lineNumber + ": " + line)
+ case n =>
+ new KeyedMessage(topic,
+ line.substring(0, n),
+ if(n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size))
+ }
+ } else {
+ new KeyedMessage(topic, line)
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
index 46ea7d4..c900c45 100644
--- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
+++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
@@ -19,7 +19,7 @@ package kafka.producer.async
import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
-import collection.mutable.ListBuffer
+import collection.mutable.ArrayBuffer
import kafka.producer.KeyedMessage
import kafka.metrics.KafkaMetricsGroup
import com.yammer.metrics.core.Gauge
@@ -57,7 +57,7 @@ class ProducerSendThread[K,V](val threadName: String,
private def processEvents() {
var lastSend = SystemTime.milliseconds
- var events = new ListBuffer[KeyedMessage[K,V]]
+ var events = new ArrayBuffer[KeyedMessage[K,V]]
var full: Boolean = false
// drain the queue until you get a shutdown command
@@ -85,7 +85,7 @@ class ProducerSendThread[K,V](val threadName: String,
// if either queue time has reached or batch size has reached, dispatch to event handler
tryToHandle(events)
lastSend = SystemTime.milliseconds
- events = new ListBuffer[KeyedMessage[K,V]]
+ events = new ArrayBuffer[KeyedMessage[K,V]]
}
}
// send the last batch of events
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/serializer/Decoder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/serializer/Decoder.scala b/core/src/main/scala/kafka/serializer/Decoder.scala
index 9da8370..6fc3b1d 100644
--- a/core/src/main/scala/kafka/serializer/Decoder.scala
+++ b/core/src/main/scala/kafka/serializer/Decoder.scala
@@ -37,13 +37,6 @@ class DefaultDecoder(props: VerifiableProperties = null) extends Decoder[Array[B
}
/**
- * Decode messages without any key
- */
-class KeylessMessageDecoder(props: VerifiableProperties = null) extends Decoder[Message] {
- def fromBytes(bytes: Array[Byte]) = new Message(bytes)
-}
-
-/**
* The string encoder translates strings into bytes. It uses UTF8 by default but takes
* an optional property serializer.encoding to control this.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index dac7056..e34a432 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -74,7 +74,7 @@ object SimpleConsumerShell extends Logging {
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
- .defaultsTo(classOf[NewlineMessageFormatter].getName)
+ .defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property")
.withRequiredArg
.describedAs("prop")
http://git-wip-us.apache.org/repos/asf/kafka/blob/6e6522c7/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 42e3e18..c83752f 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -426,7 +426,7 @@ object Utils extends Logging {
}
/**
- * This method gets comma seperated values which contains key,value pairs and returns a map of
+ * This method gets comma separated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
*/
def parseCsvMap(str: String): Map[String, String] = {