You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2014/05/26 17:36:00 UTC
git commit: KAFKA-1456 Add LZ4 and LZ4C as a compression codec patch
by James Oliver reviewed by Joe Stein
Repository: kafka
Updated Branches:
refs/heads/trunk 52f1149dd -> 547ccedcf
KAFKA-1456 Add LZ4 and LZ4C as a compression codec patch by James Oliver reviewed by Joe Stein
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/547ccedc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/547ccedc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/547ccedc
Branch: refs/heads/trunk
Commit: 547ccedcfa081de2679f9607e4e868041c2cc55f
Parents: 52f1149
Author: Joe Stein <jo...@stealth.ly>
Authored: Mon May 26 11:34:50 2014 -0400
Committer: Joe Stein <jo...@stealth.ly>
Committed: Mon May 26 11:34:50 2014 -0400
----------------------------------------------------------------------
build.gradle | 1 +
.../kafka/common/record/CompressionType.java | 10 +++++-
.../apache/kafka/common/record/Compressor.java | 34 ++++++++++++++++++++
.../org/apache/kafka/common/record/Record.java | 4 +--
config/producer.properties | 4 +--
.../scala/kafka/message/CompressionCodec.scala | 14 ++++++++
.../kafka/message/CompressionFactory.scala | 9 ++++++
core/src/main/scala/kafka/message/Message.scala | 4 +--
.../kafka/message/MessageCompressionTest.scala | 23 +++++++++++++
.../producer_perf/config/server.properties | 4 +--
10 files changed, 98 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b3bbd77..2f4167f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -365,6 +365,7 @@ project(':clients') {
dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
compile 'org.xerial.snappy:snappy-java:1.0.5'
+ compile 'net.jpountz.lz4:lz4:1.2.0'
testCompile 'com.novocode:junit-interface:0.9'
testRuntime "$slf4jlog4j"
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index c557e44..5227b2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.record;
* The compression type to use
*/
public enum CompressionType {
- NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f);
+ NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f);
public final int id;
public final String name;
@@ -40,6 +40,10 @@ public enum CompressionType {
return GZIP;
case 2:
return SNAPPY;
+ case 3:
+ return LZ4;
+ case 4:
+ return LZ4HC;
default:
throw new IllegalArgumentException("Unknown compression type id: " + id);
}
@@ -52,6 +56,10 @@ public enum CompressionType {
return GZIP;
else if (SNAPPY.name.equals(name))
return SNAPPY;
+ else if (LZ4.name.equals(name))
+ return LZ4;
+ else if (LZ4HC.name.equals(name))
+ return LZ4HC;
else
throw new IllegalArgumentException("Unknown compression name: " + name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 6ae3d06..0fa6dd2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -208,6 +208,29 @@ public class Compressor {
} catch (Exception e) {
throw new KafkaException(e);
}
+ case LZ4:
+ try {
+ Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
+ OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class)
+ .newInstance(buffer);
+ return new DataOutputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
+ case LZ4HC:
+ try {
+ Class<?> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
+ Class<?> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
+ Class<?> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
+ Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
+ Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
+ OutputStream stream = (OutputStream) lz4BlockOutputStream
+ .getConstructor(OutputStream.class, Integer.TYPE, compressorClass)
+ .newInstance(buffer, 1 << 16, compressor);
+ return new DataOutputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
@@ -234,6 +257,17 @@ public class Compressor {
} catch (Exception e) {
throw new KafkaException(e);
}
+ case LZ4:
+ case LZ4HC:
+ // dynamically load LZ4 class to avoid runtime dependency
+ try {
+ Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream");
+ InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
+ .newInstance(buffer);
+ return new DataInputStream(stream);
+ } catch (Exception e) {
+ throw new KafkaException(e);
+ }
default:
throw new IllegalArgumentException("Unknown compression type: " + type);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index ce1177e..10df9fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -57,10 +57,10 @@ public final class Record {
public static final byte CURRENT_MAGIC_VALUE = 0;
/**
- * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
+ * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
* compression
*/
- public static final int COMPRESSION_CODEC_MASK = 0x03;
+ public static final int COMPRESSION_CODEC_MASK = 0x07;
/**
* Compression code for uncompressed records
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index 52a7611..39d65d7 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
-# specify the compression codec for all data generated: none , gzip, snappy.
-# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
+# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc.
+# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally
compression.codec=none
# message encoder
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index 8762a79..de0a0fa 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -23,6 +23,8 @@ object CompressionCodec {
case NoCompressionCodec.codec => NoCompressionCodec
case GZIPCompressionCodec.codec => GZIPCompressionCodec
case SnappyCompressionCodec.codec => SnappyCompressionCodec
+ case LZ4CompressionCodec.codec => LZ4CompressionCodec
+ case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
}
}
@@ -31,6 +33,8 @@ object CompressionCodec {
case NoCompressionCodec.name => NoCompressionCodec
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
+ case LZ4CompressionCodec.name => LZ4CompressionCodec
+ case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name))
}
}
@@ -53,6 +57,16 @@ case object SnappyCompressionCodec extends CompressionCodec {
val name = "snappy"
}
+case object LZ4CompressionCodec extends CompressionCodec {
+ val codec = 3
+ val name = "lz4"
+}
+
+case object LZ4HCCompressionCodec extends CompressionCodec {
+ val codec = 4
+ val name = "lz4hc"
+}
+
case object NoCompressionCodec extends CompressionCodec {
val codec = 0
val name = "none"
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index ca833ee..8420e13 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -31,6 +31,12 @@ object CompressionFactory {
case SnappyCompressionCodec =>
import org.xerial.snappy.SnappyOutputStream
new SnappyOutputStream(stream)
+ case LZ4CompressionCodec =>
+ import net.jpountz.lz4.LZ4BlockOutputStream
+ new LZ4BlockOutputStream(stream)
+ case LZ4HCCompressionCodec =>
+ import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory}
+ new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor())
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
@@ -43,6 +49,9 @@ object CompressionFactory {
case SnappyCompressionCodec =>
import org.xerial.snappy.SnappyInputStream
new SnappyInputStream(stream)
+ case LZ4CompressionCodec | LZ4HCCompressionCodec =>
+ import net.jpountz.lz4.LZ4BlockInputStream
+ new LZ4BlockInputStream(stream)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 52c082f..d2a7293 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -54,10 +54,10 @@ object Message {
val CurrentMagicValue: Byte = 0
/**
- * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+ * Specifies the mask for the compression code. 3 bits to hold the compression codec.
* 0 is reserved to indicate no compression
*/
- val CompressionCodeMask: Int = 0x03
+ val CompressionCodeMask: Int = 0x07
/**
* Compression code for uncompressed messages
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index ed22931..6f0addc 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -30,6 +30,10 @@ class MessageCompressionTest extends JUnitSuite {
val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
if(isSnappyAvailable)
codecs += SnappyCompressionCodec
+ if(isLZ4Available)
+ codecs += LZ4CompressionCodec
+ if (izLZ4HCAvailable)
+ codecs += LZ4HCCompressionCodec
for(codec <- codecs)
testSimpleCompressDecompress(codec)
}
@@ -61,4 +65,23 @@ class MessageCompressionTest extends JUnitSuite {
case e: org.xerial.snappy.SnappyError => false
}
}
+
+ def isLZ4Available(): Boolean = {
+ try {
+ val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
+ true
+ } catch {
+ case e: UnsatisfiedLinkError => false
+ }
+ }
+
+ def izLZ4HCAvailable(): Boolean = {
+ try {
+ val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16,
+ net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor())
+ true
+ } catch {
+ case e: UnsatisfiedLinkError => false
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/system_test/producer_perf/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties
index 9f8a633..83a1e06 100644
--- a/system_test/producer_perf/config/server.properties
+++ b/system_test/producer_perf/config/server.properties
@@ -60,10 +60,10 @@ enable.zookeeper=true
# zk connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
# time based topic flush intervals in ms
#log.flush.intervals.ms.per.topic=topic:1000