You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/10/13 01:26:29 UTC

git commit: KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 7062ed7db -> 0d65f043f


KAFKA-1471 Add producer unit tests for LZ4 and LZ4HC compression codecs; patched by James Oliver; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d65f043
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d65f043
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d65f043

Branch: refs/heads/trunk
Commit: 0d65f043fed14c482a788d403c6a05544c0dd01b
Parents: 7062ed7
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sun Oct 12 16:15:40 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sun Oct 12 16:15:54 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/producer/ProducerConfig.java   | 4 ++--
 core/src/main/scala/kafka/tools/ConsoleProducer.scala            | 4 ++--
 core/src/main/scala/kafka/tools/PerfConfig.scala                 | 2 +-
 .../scala/integration/kafka/api/ProducerCompressionTest.scala    | 3 +++
 core/src/test/scala/unit/kafka/message/MessageTest.scala         | 2 +-
 5 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 79d57f9..bf4ed66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -153,8 +153,8 @@ public class ProducerConfig extends AbstractConfig {
 
     /** <code>compression.type</code> */
     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
-    private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, or <code>snappy</code>. Compression is of full batches of data, "
-                                                       + " so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
+    private static final String COMPRESSION_TYPE_DOC = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid " + " values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or <code>lz4hc</code>. "
+                                                       + "Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).";
 
     /** <code>metrics.sample.window.ms</code> */
     public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 8e9ba0b..b024a69 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -113,8 +113,8 @@ object ConsoleProducer {
       .describedAs("broker-list")
       .ofType(classOf[String])
     val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
-    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'gzip' or 'snappy'." +
-                                                                  "If specified without value, than it defaults to 'gzip'")
+    val compressionCodecOpt = parser.accepts("compression-codec", "The compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'lz4hc'." +
+                                                                  "If specified without value, then it defaults to 'gzip'")
                                     .withOptionalArg()
                                     .describedAs("compression-codec")
                                     .ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/main/scala/kafka/tools/PerfConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala
index 129cc01..c720029 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -53,7 +53,7 @@ class PerfConfig(args: Array[String]) {
     .defaultsTo(200)
   val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed")
     .withRequiredArg
-    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2")
+    .describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3, LZ4HCCompressionCodec as 4")
     .ofType(classOf[java.lang.Integer])
     .defaultsTo(0)
   val helpOpt = parser.accepts("help", "Print usage.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 17e2c6e..c954851 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -121,8 +121,11 @@ object ProducerCompressionTest {
   @Parameters
   def parameters: Collection[Array[String]] = {
     val list = new ArrayList[Array[String]]()
+    list.add(Array("none"))
     list.add(Array("gzip"))
     list.add(Array("snappy"))
+    list.add(Array("lz4"))
+    list.add(Array("lz4hc"))
     list
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0d65f043/core/src/test/scala/unit/kafka/message/MessageTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageTest.scala b/core/src/test/scala/unit/kafka/message/MessageTest.scala
index 4837585..958c1a6 100644
--- a/core/src/test/scala/unit/kafka/message/MessageTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageTest.scala
@@ -39,7 +39,7 @@ class MessageTest extends JUnitSuite {
   def setUp(): Unit = {
     val keys = Array(null, "key".getBytes, "".getBytes)
     val vals = Array("value".getBytes, "".getBytes, null)
-    val codecs = Array(NoCompressionCodec, GZIPCompressionCodec)
+    val codecs = Array(NoCompressionCodec, GZIPCompressionCodec, SnappyCompressionCodec, LZ4CompressionCodec, LZ4HCCompressionCodec)
     for(k <- keys; v <- vals; codec <- codecs)
       messages += new MessageTestVal(k, v, codec, new Message(v, k, codec))
   }