You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/02 21:07:08 UTC

git commit: kafka-1344; Kafka-console-producer.sh support snappy compression; patched by Ivan Lyutovg; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 4f23ed32d -> 372182110


kafka-1344; Kafka-console-producer.sh support snappy compression; patched by Ivan Lyutovg; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37218211
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37218211
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37218211

Branch: refs/heads/trunk
Commit: 3721821108ef344423ebe149a4dad96a3b2f989f
Parents: 4f23ed3
Author: Ivan Lyutov <iv...@gmail.com>
Authored: Wed Apr 2 12:07:02 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 2 12:07:02 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/producer/ConsoleProducer.scala   | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37218211/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 57386b1..27b0ec8 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -70,7 +70,11 @@ object ConsoleProducer {
       .describedAs("broker-list")
       .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
+                                                                  "If specified without value, than it defaults to 'gzip'")
+                                    .withOptionalArg()
+                                    .describedAs("compression-codec")
+                                    .ofType(classOf[String])
     val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
       .withRequiredArg
       .describedAs("size")
@@ -178,7 +182,12 @@ object ConsoleProducer {
     val topic = options.valueOf(topicOpt)
     val brokerList = options.valueOf(brokerListOpt)
     val sync = options.has(syncOpt)
-    val compress = options.has(compressOpt)
+    val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
+    val compressionCodec = if (options.has(compressionCodecOpt))
+                             if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty)
+                               DefaultCompressionCodec.name
+                             else compressionCodecOptionValue
+                           else NoCompressionCodec.name
     val batchSize = options.valueOf(batchSizeOpt)
     val sendTimeout = options.valueOf(sendTimeoutOpt)
     val queueSize = options.valueOf(queueSizeOpt)
@@ -255,8 +264,7 @@ object ConsoleProducer {
   class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
     val props = new Properties()
     props.put("metadata.broker.list", producerConfig.brokerList)
-    val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name
-    props.put("compression.type", compression)
+    props.put("compression.type", producerConfig.compressionCodec)
     props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
     props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
     props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
@@ -287,8 +295,7 @@ object ConsoleProducer {
   class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
     val props = new Properties()
     props.put("metadata.broker.list", producerConfig.brokerList)
-    val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
-    props.put("compression.codec", codec.toString)
+    props.put("compression.codec", producerConfig.compressionCodec)
     props.put("producer.type", if(producerConfig.sync) "sync" else "async")
     props.put("batch.num.messages", producerConfig.batchSize.toString)
     props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString)