You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/02 21:07:08 UTC
git commit: kafka-1344;
Kafka-console-producer.sh support snappy compression; patched by Ivan Lyutovg;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 4f23ed32d -> 372182110
kafka-1344; Kafka-console-producer.sh support snappy compression; patched by Ivan Lyutovg; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37218211
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37218211
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37218211
Branch: refs/heads/trunk
Commit: 3721821108ef344423ebe149a4dad96a3b2f989f
Parents: 4f23ed3
Author: Ivan Lyutov <iv...@gmail.com>
Authored: Wed Apr 2 12:07:02 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Apr 2 12:07:02 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/producer/ConsoleProducer.scala | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37218211/core/src/main/scala/kafka/producer/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
index 57386b1..27b0ec8 100644
--- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala
@@ -70,7 +70,11 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
- val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+ val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
+ "If specified without value, than it defaults to 'gzip'")
+ .withOptionalArg()
+ .describedAs("compression-codec")
+ .ofType(classOf[String])
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
.withRequiredArg
.describedAs("size")
@@ -178,7 +182,12 @@ object ConsoleProducer {
val topic = options.valueOf(topicOpt)
val brokerList = options.valueOf(brokerListOpt)
val sync = options.has(syncOpt)
- val compress = options.has(compressOpt)
+ val compressionCodecOptionValue = options.valueOf(compressionCodecOpt)
+ val compressionCodec = if (options.has(compressionCodecOpt))
+ if (compressionCodecOptionValue == null || compressionCodecOptionValue.isEmpty)
+ DefaultCompressionCodec.name
+ else compressionCodecOptionValue
+ else NoCompressionCodec.name
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
val queueSize = options.valueOf(queueSizeOpt)
@@ -255,8 +264,7 @@ object ConsoleProducer {
class NewShinyProducer(producerConfig: ProducerConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
- val compression = if(producerConfig.compress) DefaultCompressionCodec.name else NoCompressionCodec.name
- props.put("compression.type", compression)
+ props.put("compression.type", producerConfig.compressionCodec)
props.put("send.buffer.bytes", producerConfig.socketBuffer.toString)
props.put("metadata.fetch.backoff.ms", producerConfig.retryBackoffMs.toString)
props.put("metadata.expiry.ms", producerConfig.metadataExpiryMs.toString)
@@ -287,8 +295,7 @@ object ConsoleProducer {
class OldProducer(producerConfig: ConsoleProducer.ProducerConfig) extends Producer {
val props = new Properties()
props.put("metadata.broker.list", producerConfig.brokerList)
- val codec = if(producerConfig.compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
- props.put("compression.codec", codec.toString)
+ props.put("compression.codec", producerConfig.compressionCodec)
props.put("producer.type", if(producerConfig.sync) "sync" else "async")
props.put("batch.num.messages", producerConfig.batchSize.toString)
props.put("message.send.max.retries", producerConfig.messageSendMaxRetries.toString)