You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/15 10:47:01 UTC

git commit: [SPARK-2399] Add support for LZ4 compression.

Repository: spark
Updated Branches:
  refs/heads/master 7446f5ff9 -> dd95abada


[SPARK-2399] Add support for LZ4 compression.

Based on Greg Bowyer's patch from JIRA https://issues.apache.org/jira/browse/SPARK-2399

Author: Reynold Xin <rx...@apache.org>

Closes #1416 from rxin/lz4 and squashes the following commits:

6c8fefe [Reynold Xin] Fixed typo.
8a14d38 [Reynold Xin] [SPARK-2399] Add support for LZ4 compression.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd95abad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd95abad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd95abad

Branch: refs/heads/master
Commit: dd95abada78b4d0aec97dacda50fdfd74464b073
Parents: 7446f5f
Author: Reynold Xin <rx...@apache.org>
Authored: Tue Jul 15 01:46:57 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Jul 15 01:46:57 2014 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  4 ++++
 .../org/apache/spark/io/CompressionCodec.scala  | 22 ++++++++++++++++++++
 .../apache/spark/io/CompressionCodecSuite.scala |  6 ++++++
 docs/configuration.md                           | 10 ++++++++-
 pom.xml                                         |  5 +++++
 5 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 4ed920a..1054cec 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -115,6 +115,10 @@
       <artifactId>snappy-java</artifactId>
     </dependency>
     <dependency>
+      <groupId>net.jpountz.lz4</groupId>
+      <artifactId>lz4</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>chill_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 4b0fe1a..33402c9 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.io
 import java.io.{InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
 import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
@@ -61,6 +62,27 @@ private[spark] object CompressionCodec {
 
 /**
  * :: DeveloperApi ::
+ * LZ4 implementation of [[org.apache.spark.io.CompressionCodec]].
+ * Block size can be configured by `spark.io.compression.lz4.block.size`.
+ *
+ * Note: The wire protocol for this codec is not guaranteed to be compatible across versions
+ *       of Spark. This is intended for use as an internal compression utility within a single Spark
+ *       application.
+ */
+@DeveloperApi
+class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
+
+  override def compressedOutputStream(s: OutputStream): OutputStream = {
+    val blockSize = conf.getInt("spark.io.compression.lz4.block.size", 32768)
+    new LZ4BlockOutputStream(s, blockSize)
+  }
+
+  override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
+}
+
+
+/**
+ * :: DeveloperApi ::
  * LZF implementation of [[org.apache.spark.io.CompressionCodec]].
  *
  * Note: The wire protocol for this codec is not guaranteed to be compatible across versions

http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 68a0ea3..42fc395 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -50,6 +50,12 @@ class CompressionCodecSuite extends FunSuite {
     testCodec(codec)
   }
 
+  test("lz4 compression codec") {
+    val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
+    assert(codec.getClass === classOf[LZ4CompressionCodec])
+    testCodec(codec)
+  }
+
   test("lzf compression codec") {
     val codec = CompressionCodec.createCodec(conf, classOf[LZFCompressionCodec].getName)
     assert(codec.getClass === classOf[LZFCompressionCodec])

http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 07aa4c0..19fd980 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -350,7 +350,15 @@ Apart from these, the following properties are also available, and may be useful
   <td>32768</td>
   <td>
     Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
-    is used.
+    is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.io.compression.lz4.block.size</code></td>
+  <td>32768</td>
+  <td>
+    Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+    is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/dd95abad/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fa80707..d570f3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -298,6 +298,11 @@
         <version>1.0.5</version>
       </dependency>
       <dependency>
+        <groupId>net.jpountz.lz4</groupId>
+        <artifactId>lz4</artifactId>
+        <version>1.2.0</version>
+      </dependency>
+      <dependency>
         <groupId>com.clearspring.analytics</groupId>
         <artifactId>stream</artifactId>
         <version>2.7.0</version>