You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/11/01 13:54:13 UTC

spark git commit: [SPARK-19112][CORE] Support for ZStandard codec

Repository: spark
Updated Branches:
  refs/heads/master 07f390a27 -> 444bce1c9


[SPARK-19112][CORE] Support for ZStandard codec

## What changes were proposed in this pull request?

Using zstd compression for Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken.

## Benchmark
Please note that this benchmark is using real world compute heavy production workload spilling TBs of data to disk

|         | zstd performance as compred to LZ4   |
| ------------- | -----:|
| spill/shuffle bytes    | -48% |
| cpu time    |    + 3% |
| cpu reservation time       |    -40%|
| latency     |     -40% |

## How was this patch tested?

Tested by running few jobs spilling large amount of data on the cluster and amount of intermediate data written to disk reduced by as much as 50%.

Author: Sital Kedia <sk...@fb.com>

Closes #18805 from sitalkedia/skedia/upstream_zstd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/444bce1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/444bce1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/444bce1c

Branch: refs/heads/master
Commit: 444bce1c98c45147fe63e2132e9743a0c5e49598
Parents: 07f390a
Author: Sital Kedia <sk...@fb.com>
Authored: Wed Nov 1 14:54:08 2017 +0100
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Wed Nov 1 14:54:08 2017 +0100

----------------------------------------------------------------------
 LICENSE                                         |  2 ++
 core/pom.xml                                    |  4 +++
 .../org/apache/spark/io/CompressionCodec.scala  | 36 ++++++++++++++++++--
 .../apache/spark/io/CompressionCodecSuite.scala | 18 ++++++++++
 dev/deps/spark-deps-hadoop-2.6                  |  1 +
 dev/deps/spark-deps-hadoop-2.7                  |  1 +
 docs/configuration.md                           | 20 ++++++++++-
 licenses/LICENSE-zstd-jni.txt                   | 26 ++++++++++++++
 licenses/LICENSE-zstd.txt                       | 30 ++++++++++++++++
 pom.xml                                         |  5 +++
 10 files changed, 140 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index 39fe0dc..c2b0d72 100644
--- a/LICENSE
+++ b/LICENSE
@@ -269,6 +269,8 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
      (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
      (BSD 3 Clause) DPark (https://github.com/douban/dpark/blob/master/LICENSE)
      (BSD 3 Clause) CloudPickle (https://github.com/cloudpipe/cloudpickle/blob/master/LICENSE)
+     (BSD 2 Clause) Zstd-jni (https://github.com/luben/zstd-jni/blob/master/LICENSE)
+     (BSD license) Zstd (https://github.com/facebook/zstd/blob/v1.3.1/LICENSE)
 
 ========================================================================
 MIT licenses

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 54f7a34..fa138d3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -199,6 +199,10 @@
       <artifactId>lz4-java</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.roaringbitmap</groupId>
       <artifactId>RoaringBitmap</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 27f2e42..7722db5 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.io
 import java.io._
 import java.util.Locale
 
+import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
@@ -50,13 +51,14 @@ private[spark] object CompressionCodec {
 
   private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
     (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
-      || codec.isInstanceOf[LZ4CompressionCodec])
+      || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
   }
 
   private val shortCompressionCodecNames = Map(
     "lz4" -> classOf[LZ4CompressionCodec].getName,
     "lzf" -> classOf[LZFCompressionCodec].getName,
-    "snappy" -> classOf[SnappyCompressionCodec].getName)
+    "snappy" -> classOf[SnappyCompressionCodec].getName,
+    "zstd" -> classOf[ZStdCompressionCodec].getName)
 
   def getCodecName(conf: SparkConf): String = {
     conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
@@ -219,3 +221,33 @@ private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends Ou
     }
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * ZStandard implementation of [[org.apache.spark.io.CompressionCodec]]. For more
+ * details see - http://facebook.github.io/zstd/
+ *
+ * @note The wire protocol for this codec is not guaranteed to be compatible across versions
+ * of Spark. This is intended for use as an internal compression utility within a single Spark
+ * application.
+ */
+@DeveloperApi
+class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
+
+  private val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
+  // Default compression level for zstd compression to 1 because it is
+  // fastest of all with reasonably high compression ratio.
+  private val level = conf.getInt("spark.io.compression.zstd.level", 1)
+
+  override def compressedOutputStream(s: OutputStream): OutputStream = {
+    // Wrap the zstd output stream in a buffered output stream, so that we can
+    // avoid overhead excessive of JNI call while trying to compress small amount of data.
+    new BufferedOutputStream(new ZstdOutputStream(s, level), bufferSize)
+  }
+
+  override def compressedInputStream(s: InputStream): InputStream = {
+    // Wrap the zstd input stream in a buffered input stream so that we can
+    // avoid overhead excessive of JNI call while trying to uncompress small amount of data.
+    new BufferedInputStream(new ZstdInputStream(s), bufferSize)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 9e9c2b0..7b40e3e 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -104,6 +104,24 @@ class CompressionCodecSuite extends SparkFunSuite {
     testConcatenationOfSerializedStreams(codec)
   }
 
+  test("zstd compression codec") {
+    val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
+    assert(codec.getClass === classOf[ZStdCompressionCodec])
+    testCodec(codec)
+  }
+
+  test("zstd compression codec short form") {
+    val codec = CompressionCodec.createCodec(conf, "zstd")
+    assert(codec.getClass === classOf[ZStdCompressionCodec])
+    testCodec(codec)
+  }
+
+  test("zstd supports concatenation of serialized zstd") {
+    val codec = CompressionCodec.createCodec(conf, classOf[ZStdCompressionCodec].getName)
+    assert(codec.getClass === classOf[ZStdCompressionCodec])
+    testConcatenationOfSerializedStreams(codec)
+  }
+
   test("bad compression codec") {
     intercept[IllegalArgumentException] {
       CompressionCodec.createCodec(conf, "foobar")

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 6e2fc63..21c8a75 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -189,3 +189,4 @@ xercesImpl-2.9.1.jar
 xmlenc-0.52.jar
 xz-1.0.jar
 zookeeper-3.4.6.jar
+zstd-jni-1.3.2-2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index c2bbc25..7173426 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -190,3 +190,4 @@ xercesImpl-2.9.1.jar
 xmlenc-0.52.jar
 xz-1.0.jar
 zookeeper-3.4.6.jar
+zstd-jni-1.3.2-2.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index d3c358b..9b9583d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -889,7 +889,8 @@ Apart from these, the following properties are also available, and may be useful
     e.g.
     <code>org.apache.spark.io.LZ4CompressionCodec</code>,
     <code>org.apache.spark.io.LZFCompressionCodec</code>,
-    and <code>org.apache.spark.io.SnappyCompressionCodec</code>.
+    <code>org.apache.spark.io.SnappyCompressionCodec</code>,
+    and <code>org.apache.spark.io.ZstdCompressionCodec</code>.
   </td>
 </tr>
 <tr>
@@ -909,6 +910,23 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.io.compression.zstd.level</code></td>
+  <td>1</td>
+  <td>
+    Compression level for Zstd compression codec. Increasing the compression level will result in better
+    compression at the expense of more CPU and memory.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.io.compression.zstd.bufferSize</code></td>
+  <td>32k</td>
+  <td>
+    Buffer size used in Zstd compression, in the case when Zstd compression codec
+    is used. Lowering this size will lower the shuffle memory usage when Zstd is used, but it
+    might increase the compression cost because of excessive JNI call overhead.
+  </td>
+</tr>
+<tr>
   <td><code>spark.kryo.classesToRegister</code></td>
   <td>(none)</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/licenses/LICENSE-zstd-jni.txt
----------------------------------------------------------------------
diff --git a/licenses/LICENSE-zstd-jni.txt b/licenses/LICENSE-zstd-jni.txt
new file mode 100644
index 0000000..32c6bbd
--- /dev/null
+++ b/licenses/LICENSE-zstd-jni.txt
@@ -0,0 +1,26 @@
+Zstd-jni: JNI bindings to Zstd Library
+
+Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.
+
+BSD License
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+  list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice, this
+  list of conditions and the following disclaimer in the documentation and/or
+  other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/licenses/LICENSE-zstd.txt
----------------------------------------------------------------------
diff --git a/licenses/LICENSE-zstd.txt b/licenses/LICENSE-zstd.txt
new file mode 100644
index 0000000..a793a80
--- /dev/null
+++ b/licenses/LICENSE-zstd.txt
@@ -0,0 +1,30 @@
+BSD License
+
+For Zstandard software
+
+Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+   list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+   this list of conditions and the following disclaimer in the documentation
+   and/or other materials provided with the distribution.
+
+ * Neither the name Facebook nor the names of its contributors may be used to
+   endorse or promote products derived from this software without specific
+   prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

http://git-wip-us.apache.org/repos/asf/spark/blob/444bce1c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2d59f06..652aed4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -538,6 +538,11 @@
         <version>1.4.0</version>
       </dependency>
       <dependency>
+        <groupId>com.github.luben</groupId>
+        <artifactId>zstd-jni</artifactId>
+        <version>1.3.2-2</version>
+      </dependency>
+      <dependency>
         <groupId>com.clearspring.analytics</groupId>
         <artifactId>stream</artifactId>
         <version>2.7.0</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org