You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/10/13 01:26:29 UTC
git commit: KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC
compression codecs; patched by James Oliver; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 7062ed7db -> 0d65f043f
KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d65f043
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d65f043
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d65f043
Branch: refs/heads/trunk
Commit: 0d65f043fed14c482a788d403c6a05544c0dd01b
Parents: 7062ed7
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sun Oct 12 16:15:40 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Oct 12 16:15:54 2014 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/producer/ProducerConfig.java | 4 ++--
core/src/main/scala/kafka/tools/ConsoleProducer.scala | 4 ++--
core/src/main/scala/kafka/tools/PerfConfig.scala | 2 +-
.../scala/integration/kafka/api/ProducerCompressionTest.scala | 3 +++
core/src/test/scala/unit/kafka/message/MessageTest.scala | 2 +-
5 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 79d57f9..bf4ed66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -153,8 +153,8 @@ public class ProducerConfig extends AbstractConfig {
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
- private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, or <code>snappy</code>. Compression is of full batches of data, "
- + " so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
+ private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>lz4hc</code>. "
+ + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
/** <code>metrics.sample.window.ms</code> */
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 8e9ba0b..b024a69 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -113,8 +113,8 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
- val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
- "If specified without value, than it defaults to 'gzip'")
+ val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
+ "If specified without value, then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
.ofType(classOf[String])
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 129cc01..c720029 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
.defaultsTo(200)
val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
.withRequiredArg
- .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
+ .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val helpOpt = parser.accepts("help", "Print usage.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 17e2c6e..c954851 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -121,8 +121,11 @@ object ProducerCompressionTest {
@Parameters
def parameters: Collection[Array[String]] = {
val list = new ArrayList[Array[String]]()
+ list.add(Array("none"))
list.add(Array("gzip"))
list.add(Array("snappy"))
+ list.add(Array("lz4"))
+ list.add(Array("lz4hc"))
list
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 4837585..958c1a6 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite {
def setUp(): Unit = {
val keys = Array(null, "key".getBytes, "".getBytes)
val vals = Array("value".getBytes, "".getBytes, null)
- val codecs = Array(NoCompressionCodec, GZIPCompressionCodec)
+ val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec)
for(k <- keys; v <- vals; codec <- codecs)
messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
}