You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/10/04 02:54:14 UTC
svn commit: r1178659 - in /incubator/kafka/trunk:
bin/kafka-console-producer.sh
core/src/main/scala/kafka/producer/ConsoleProducer.scala
Author: jkreps
Date: Tue Oct 4 00:54:14 2011
New Revision: 1178659
URL: http://svn.apache.org/viewvc?rev=1178659&view=rev
Log:
KAFKA-130 Add a "console producer" that sends messages from standard input
Added:
incubator/kafka/trunk/bin/kafka-console-producer.sh (with props)
incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
Added: incubator/kafka/trunk/bin/kafka-console-producer.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/bin/kafka-console-producer.sh?rev=1178659&view=auto
==============================================================================
--- incubator/kafka/trunk/bin/kafka-console-producer.sh (added)
+++ incubator/kafka/trunk/bin/kafka-console-producer.sh Tue Oct 4 00:54:14 2011
@@ -0,0 +1,4 @@
+#!/bin/bash
+
+base_dir=$(dirname $0)
+$base_dir/kafka-run-class.sh kafka.producer.ConsoleProducer $@
Propchange: incubator/kafka/trunk/bin/kafka-console-producer.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala?rev=1178659&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/producer/ConsoleProducer.scala Tue Oct 4 00:54:14 2011
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import scala.collection.JavaConversions._
+import org.I0Itec.zkclient._
+import joptsimple._
+import org.apache.log4j.Logger
+import java.util.Arrays.asList
+import java.util.Properties
+import java.util.Random
+import java.io._
+import kafka.message._
+import kafka.utils._
+import kafka.serializer._
+
+object ConsoleProducer {
+
+ private val logger = Logger.getLogger(getClass())
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser
+ val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+ .withRequiredArg
+ .describedAs("topic")
+ .ofType(classOf[String])
+ val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
+ .withRequiredArg
+ .describedAs("connection_string")
+ .ofType(classOf[String])
+ val asyncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+ val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+ val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
+ .withRequiredArg
+ .describedAs("size")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(200)
+ val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+ " a message will queue awaiting suffient batch size. The value is given in ms.")
+ .withRequiredArg
+ .describedAs("timeout_ms")
+ .ofType(classOf[java.lang.Long])
+ .defaultsTo(1000)
+ val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
+ .withRequiredArg
+ .describedAs("encoder_class")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo(classOf[StringEncoder].getName)
+ val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+ "By default each line is read as a seperate message.")
+ .withRequiredArg
+ .describedAs("reader_class")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo(classOf[LineMessageReader].getName)
+ val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
+ "This allows custom configuration for a user-defined message reader.")
+ .withRequiredArg
+ .describedAs("prop")
+ .ofType(classOf[String])
+
+
+ val options = parser.parse(args : _*)
+ for(arg <- List(topicOpt, zkConnectOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val topic = options.valueOf(topicOpt)
+ val zkConnect = options.valueOf(zkConnectOpt)
+ val async = options.has(asyncOpt)
+ val compress = options.has(compressOpt)
+ val batchSize = options.valueOf(batchSizeOpt)
+ val sendTimeout = options.valueOf(sendTimeoutOpt)
+ val encoderClass = options.valueOf(messageEncoderOpt)
+ val readerClass = options.valueOf(messageReaderOpt)
+ val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+
+ val props = new Properties()
+ props.put("zk.connect", zkConnect)
+ props.put("compression.codec", DefaultCompressionCodec.codec.toString)
+ props.put("producer.type", if(async) "async" else "sync")
+ if(options.has(batchSizeOpt))
+ props.put("batch.size", batchSize)
+ props.put("queue.enqueueTimeout.ms", sendTimeout.toString)
+ props.put("serializer.class", encoderClass)
+
+ val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
+ reader.init(System.in, cmdLineProps)
+
+ val producer = new Producer[Any, Any](new ProducerConfig(props))
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ producer.close()
+ }
+ })
+
+ var message: AnyRef = null
+ do {
+ message = reader.readMessage()
+ if(message != null)
+ producer.send(new ProducerData(topic, message))
+ } while(message != null)
+ }
+
+ def parseLineReaderArgs(args: Iterable[String]): Properties = {
+ val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+ if(!splits.forall(_.length == 2)) {
+ System.err.println("Invalid line reader properties: " + args.mkString(" "))
+ System.exit(1)
+ }
+ val props = new Properties
+ for(a <- splits)
+ props.put(a(0), a(1))
+ props
+ }
+
+ trait MessageReader {
+ def init(inputStream: InputStream, props: Properties) {}
+ def readMessage(): AnyRef
+ def close() {}
+ }
+
+ class LineMessageReader extends MessageReader {
+ var reader: BufferedReader = null
+
+ override def init(inputStream: InputStream, props: Properties) {
+ reader = new BufferedReader(new InputStreamReader(inputStream))
+ }
+
+ override def readMessage() = reader.readLine()
+ }
+}