You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2014/05/26 17:36:00 UTC

git commit: KAFKA-1456 Add LZ4 and LZ4C as a compression codec patch by James Oliver reviewed by Joe Stein

Repository: kafka
Updated Branches:
  refs/heads/trunk 52f1149dd -> 547ccedcf


KAFKA-1456 Add LZ4 and LZ4C as a compression codec patch by James Oliver reviewed by Joe Stein


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/547ccedc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/547ccedc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/547ccedc

Branch: refs/heads/trunk
Commit: 547ccedcfa081de2679f9607e4e868041c2cc55f
Parents: 52f1149
Author: Joe Stein <jo...@stealth.ly>
Authored: Mon May 26 11:34:50 2014 -0400
Committer: Joe Stein <jo...@stealth.ly>
Committed: Mon May 26 11:34:50 2014 -0400

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../kafka/common/record/CompressionType.java    | 10 +++++-
 .../apache/kafka/common/record/Compressor.java  | 34 ++++++++++++++++++++
 .../org/apache/kafka/common/record/Record.java  |  4 +--
 config/producer.properties                      |  4 +--
 .../scala/kafka/message/CompressionCodec.scala  | 14 ++++++++
 .../kafka/message/CompressionFactory.scala      |  9 ++++++
 core/src/main/scala/kafka/message/Message.scala |  4 +--
 .../kafka/message/MessageCompressionTest.scala  | 23 +++++++++++++
 .../producer_perf/config/server.properties      |  4 +--
 10 files changed, 98 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index b3bbd77..2f4167f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -365,6 +365,7 @@ project(':clients') {
   dependencies {
     compile "org.slf4j:slf4j-api:1.7.6"
     compile 'org.xerial.snappy:snappy-java:1.0.5'
+    compile 'net.jpountz.lz4:lz4:1.2.0'
 
     testCompile 'com.novocode:junit-interface:0.9'
     testRuntime "$slf4jlog4j"

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index c557e44..5227b2d 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -20,7 +20,7 @@ package org.apache.kafka.common.record;
  * The compression type to use
  */
 public enum CompressionType {
-    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f);
+    NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f), LZ4HC(4, "lz4hc", 0.5f);
 
     public final int id;
     public final String name;
@@ -40,6 +40,10 @@ public enum CompressionType {
                 return GZIP;
             case 2:
                 return SNAPPY;
+            case 3:
+                return LZ4;
+            case 4:
+                return LZ4HC;
             default:
                 throw new IllegalArgumentException("Unknown compression type id: " + id);
         }
@@ -52,6 +56,10 @@ public enum CompressionType {
             return GZIP;
         else if (SNAPPY.name.equals(name))
             return SNAPPY;
+        else if (LZ4.name.equals(name))
+            return LZ4;
+        else if (LZ4HC.name.equals(name))
+            return LZ4HC;
         else
             throw new IllegalArgumentException("Unknown compression name: " + name);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 6ae3d06..0fa6dd2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -208,6 +208,29 @@ public class Compressor {
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
+                case LZ4:
+                    try {
+                        Class LZ4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
+                        OutputStream stream = (OutputStream) LZ4BlockOutputStream.getConstructor(OutputStream.class)
+                            .newInstance(buffer);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
+                case LZ4HC:
+                    try {
+                        Class<?> factoryClass = Class.forName("net.jpountz.lz4.LZ4Factory");
+                        Class<?> compressorClass = Class.forName("net.jpountz.lz4.LZ4Compressor");
+                        Class<?> lz4BlockOutputStream = Class.forName("net.jpountz.lz4.LZ4BlockOutputStream");
+                        Object factory = factoryClass.getMethod("fastestInstance").invoke(null);
+                        Object compressor = factoryClass.getMethod("highCompressor").invoke(factory);
+                        OutputStream stream = (OutputStream) lz4BlockOutputStream
+                            .getConstructor(OutputStream.class, Integer.TYPE, compressorClass)
+                            .newInstance(buffer, 1 << 16, compressor);
+                        return new DataOutputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
                 default:
                     throw new IllegalArgumentException("Unknown compression type: " + type);
             }
@@ -234,6 +257,17 @@ public class Compressor {
                     } catch (Exception e) {
                         throw new KafkaException(e);
                     }
+                case LZ4:
+                case LZ4HC:
+                    // dynamically load LZ4 class to avoid runtime dependency
+                    try {
+                        Class inputStreamClass = Class.forName("net.jpountz.lz4.LZ4BlockInputStream");
+                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
+                            .newInstance(buffer);
+                        return new DataInputStream(stream);
+                    } catch (Exception e) {
+                        throw new KafkaException(e);
+                    }
                 default:
                     throw new IllegalArgumentException("Unknown compression type: " + type);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index ce1177e..10df9fd 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -57,10 +57,10 @@ public final class Record {
     public static final byte CURRENT_MAGIC_VALUE = 0;
 
     /**
-     * Specifies the mask for the compression code. 2 bits to hold the compression codec. 0 is reserved to indicate no
+     * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
      * compression
      */
-    public static final int COMPRESSION_CODEC_MASK = 0x03;
+    public static final int COMPRESSION_CODEC_MASK = 0x07;
 
     /**
      * Compression code for uncompressed records

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/config/producer.properties
----------------------------------------------------------------------
diff --git a/config/producer.properties b/config/producer.properties
index 52a7611..39d65d7 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -26,8 +26,8 @@ metadata.broker.list=localhost:9092
 # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
 producer.type=sync
 
-# specify the compression codec for all data generated: none , gzip, snappy.
-# the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally
+# specify the compression codec for all data generated: none, gzip, snappy, lz4, lz4hc.
+# the old config values work as well: 0, 1, 2, 3, 4 for none, gzip, snappy, lz4, lz4hc, respectivally
 compression.codec=none
 
 # message encoder

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala
index 8762a79..de0a0fa 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -23,6 +23,8 @@ object CompressionCodec {
       case NoCompressionCodec.codec => NoCompressionCodec
       case GZIPCompressionCodec.codec => GZIPCompressionCodec
       case SnappyCompressionCodec.codec => SnappyCompressionCodec
+      case LZ4CompressionCodec.codec => LZ4CompressionCodec
+      case LZ4HCCompressionCodec.codec => LZ4HCCompressionCodec
       case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
     }
   }
@@ -31,6 +33,8 @@ object CompressionCodec {
       case NoCompressionCodec.name => NoCompressionCodec
       case GZIPCompressionCodec.name => GZIPCompressionCodec
       case SnappyCompressionCodec.name => SnappyCompressionCodec
+      case LZ4CompressionCodec.name => LZ4CompressionCodec
+      case LZ4HCCompressionCodec.name => LZ4HCCompressionCodec
       case _ => throw new kafka.common.UnknownCodecException("%s is an unknown compression codec".format(name))
     }
   }
@@ -53,6 +57,16 @@ case object SnappyCompressionCodec extends CompressionCodec {
   val name = "snappy"
 }
 
+case object LZ4CompressionCodec extends CompressionCodec {
+  val codec = 3
+  val name = "lz4"
+}
+
+case object LZ4HCCompressionCodec extends CompressionCodec {
+  val codec = 4
+  val name = "lz4hc"
+}
+
 case object NoCompressionCodec extends CompressionCodec {
   val codec = 0
   val name = "none"

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index ca833ee..8420e13 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -31,6 +31,12 @@ object CompressionFactory {
       case SnappyCompressionCodec => 
         import org.xerial.snappy.SnappyOutputStream
         new SnappyOutputStream(stream)
+      case LZ4CompressionCodec =>
+        import net.jpountz.lz4.LZ4BlockOutputStream
+        new LZ4BlockOutputStream(stream)
+      case LZ4HCCompressionCodec =>
+        import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory}
+        new LZ4BlockOutputStream(stream, 1 << 16, LZ4Factory.fastestInstance().highCompressor())
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }
@@ -43,6 +49,9 @@ object CompressionFactory {
       case SnappyCompressionCodec => 
         import org.xerial.snappy.SnappyInputStream
         new SnappyInputStream(stream)
+      case LZ4CompressionCodec | LZ4HCCompressionCodec =>
+        import net.jpountz.lz4.LZ4BlockInputStream
+        new LZ4BlockInputStream(stream)
       case _ =>
         throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala
index 52c082f..d2a7293 100644
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -54,10 +54,10 @@ object Message {
   val CurrentMagicValue: Byte = 0
 
   /**
-   * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+   * Specifies the mask for the compression code. 3 bits to hold the compression codec.
    * 0 is reserved to indicate no compression
    */
-  val CompressionCodeMask: Int = 0x03 
+  val CompressionCodeMask: Int = 0x07
 
   /**
    * Compression code for uncompressed messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index ed22931..6f0addc 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -30,6 +30,10 @@ class MessageCompressionTest extends JUnitSuite {
     val codecs = mutable.ArrayBuffer[CompressionCodec](GZIPCompressionCodec)
     if(isSnappyAvailable)
       codecs += SnappyCompressionCodec
+    if(isLZ4Available)
+      codecs += LZ4CompressionCodec
+    if (izLZ4HCAvailable)
+      codecs += LZ4HCCompressionCodec
     for(codec <- codecs)
       testSimpleCompressDecompress(codec)
   }
@@ -61,4 +65,23 @@ class MessageCompressionTest extends JUnitSuite {
       case e: org.xerial.snappy.SnappyError => false
     }
   }
+
+  def isLZ4Available(): Boolean = {
+    try {
+      val lz4 = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream())
+      true
+    } catch {
+      case e: UnsatisfiedLinkError => false
+    }
+  }
+
+  def izLZ4HCAvailable(): Boolean = {
+    try {
+      val lz4hc = new net.jpountz.lz4.LZ4BlockOutputStream(new ByteArrayOutputStream(), 1 << 16, 
+        net.jpountz.lz4.LZ4Factory.fastestInstance().highCompressor())
+      true
+    } catch {
+      case e: UnsatisfiedLinkError => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/547ccedc/system_test/producer_perf/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/producer_perf/config/server.properties b/system_test/producer_perf/config/server.properties
index 9f8a633..83a1e06 100644
--- a/system_test/producer_perf/config/server.properties
+++ b/system_test/producer_perf/config/server.properties
@@ -60,10 +60,10 @@ enable.zookeeper=true
 # zk connection string
 # comma separated host:port pairs, each corresponding to a zk
 # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
+zookeeper.connect=localhost:2181
 
 # timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
+zookeeper.connection.timeout.ms=1000000
 
 # time based topic flush intervals in ms
 #log.flush.intervals.ms.per.topic=topic:1000