You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/10/04 02:54:14 UTC

svn commit: r1178659 - in /incubator/kafka/trunk: bin/kafka-console-producer.sh core/src/main/scala/kafka/producer/ConsoleProducer.scala

Author: jkreps
Date: Tue Oct  4 00:54:14 2011
New Revision: 1178659

URL: http://svn.apache.org/viewvc?rev=1178659&view=rev
Log:
KAFKA-130 Add a "console producer" that sends messages from standard input


Added:
    incubator/kafka/trunk/bin/kafka-console-producer.sh   (with props)
    incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala

Added: incubator/kafka/trunk/bin/kafka-console-producer.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/bin/kafka-console-producer.sh?rev=1178659&view=auto
==============================================================================
--- incubator/kafka/trunk/bin/kafka-console-producer.sh (added)
+++ incubator/kafka/trunk/bin/kafka-console-producer.sh Tue Oct  4 00:54:14 2011
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+base_dir=$(dirname $0)
+$base_dir/kafka-run-class.sh kafka.producer.ConsoleProducer $@

Propchange: incubator/kafka/trunk/bin/kafka-console-producer.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1178659&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Tue Oct  4 00:54:14 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import scala.collection.JavaConversions._
+import org.I0Itec.zkclient._
+import joptsimple._
+import org.apache.log4j.Logger
+import java.util.Arrays.asList
+import java.util.Properties
+import java.util.Random
+import java.io._
+import kafka.message._
+import kafka.utils._
+import kafka.serializer._
+
+object ConsoleProducer { 
+
+  private val logger = Logger.getLogger(getClass())
+
+  def main(args: Array[String]) { 
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+                           .withRequiredArg
+                           .describedAs("topic")
+                           .ofType(classOf[String])
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
+                           .withRequiredArg
+                           .describedAs("connection_string")
+                           .ofType(classOf[String])
+    val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
+                             .withRequiredArg
+                             .describedAs("size")
+                             .ofType(classOf[java.lang.Integer])
+                             .defaultsTo(200)
+    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" + 
+                                                   " a message will queue awaiting suffient batch size. The value is given in ms.")
+                               .withRequiredArg
+                               .describedAs("timeout_ms")
+                               .ofType(classOf[java.lang.Long])
+                               .defaultsTo(1000)
+    val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
+                                 .withRequiredArg
+                                 .describedAs("encoder_class")
+                                 .ofType(classOf[java.lang.String])
+                                 .defaultsTo(classOf[StringEncoder].getName)
+    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " + 
+                                                          "By default each line is read as a seperate message.")
+                                  .withRequiredArg
+                                  .describedAs("reader_class")
+                                  .ofType(classOf[java.lang.String])
+                                  .defaultsTo(classOf[LineMessageReader].getName)
+    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + 
+                                                 "This allows custom configuration for a user-defined message reader.")
+                            .withRequiredArg
+                            .describedAs("prop")
+                            .ofType(classOf[String])
+
+
+    val options = parser.parse(args : _*)
+    for(arg <- List(topicOpt, zkConnectOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val async = options.has(asyncOpt)
+    val compress = options.has(compressOpt)
+    val batchSize = options.valueOf(batchSizeOpt)
+    val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val encoderClass = options.valueOf(messageEncoderOpt)
+    val readerClass = options.valueOf(messageReaderOpt)
+    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+
+    val props = new Properties()
+    props.put("zk.connect", zkConnect)
+    props.put("compression.codec", DefaultCompressionCodec.codec.toString)
+    props.put("producer.type", if(async) "async" else "sync")
+    if(options.has(batchSizeOpt))
+      props.put("batch.size", batchSize)
+    props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+    props.put("serializer.class", encoderClass)
+
+    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
+    reader.init(System.in, cmdLineProps)
+
+    val producer = new Producer[Any, Any](new ProducerConfig(props))
+
+    Runtime.getRuntime.addShutdownHook(new Thread() {
+      override def run() {
+        producer.close()
+      }
+    })
+
+    var message: AnyRef = null
+    do { 
+      message = reader.readMessage()
+      if(message != null)
+        producer.send(new ProducerData(topic, message))
+    } while(message != null)
+  }
+
+  def parseLineReaderArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if(!splits.forall(_.length == 2)) {
+      System.err.println("Invalid line reader properties: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for(a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+
+  trait MessageReader { 
+    def init(inputStream: InputStream, props: Properties) {}
+    def readMessage(): AnyRef
+    def close() {}
+  }
+
+  class LineMessageReader extends MessageReader { 
+    var reader: BufferedReader = null
+
+    override def init(inputStream: InputStream, props: Properties) { 
+      reader = new BufferedReader(new InputStreamReader(inputStream))
+    }
+
+    override def readMessage() = reader.readLine()
+  }
+}