You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/08/09 15:31:57 UTC

spark git commit: [SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0)

Repository: spark
Updated Branches:
  refs/heads/master 83fe3b5e1 -> b78cf13bf


[SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0)

## What changes were proposed in this pull request?
This pr updated `lz4-java` to the latest (v1.4.0) and removed custom `LZ4BlockInputStream`. We currently use custom `LZ4BlockInputStream` to read concatenated byte stream in shuffle. But, this functionality has been implemented in the latest lz4-java (https://github.com/lz4/lz4-java/pull/105). So, we might update the latest to remove the custom `LZ4BlockInputStream`.

Major diffs between the latest release and v1.3.0 in the master are as follows (https://github.com/lz4/lz4-java/compare/62f7547abb0819d1ca1e669645ee1a9d26cd60b0...6d4693f56253fcddfad7b441bb8d917b182efa2d);
- fixed NPE in XXHashFactory similarly
- Don't place resources in default package to support shading
- Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed
- Try to load lz4-java from java.library.path, then fallback to bundled
- Add ppc64le binary
- Add s390x JNI binding
- Add basic LZ4 Frame v1.5.0 support
- enable aarch64 support for lz4-java
- Allow unsafeInstance() for ppc64le archiecture
- Add unsafeInstance support for AArch64
- Support 64-bit JNI build on Solaris
- Avoid over-allocating a buffer
- Allow EndMark to be incompressible for LZ4FrameInputStream.
- Concat byte stream

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #18883 from maropu/SPARK-21276.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b78cf13b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b78cf13b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b78cf13b

Branch: refs/heads/master
Commit: b78cf13bf05f0eadd7ae97df84b6e1505dc5ff9f
Parents: 83fe3b5
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Aug 9 17:31:52 2017 +0200
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Aug 9 17:31:52 2017 +0200

----------------------------------------------------------------------
 core/pom.xml                                    |   4 +-
 .../apache/spark/io/LZ4BlockInputStream.java    | 260 -------------------
 .../org/apache/spark/io/CompressionCodec.scala  |   7 +-
 dev/deps/spark-deps-hadoop-2.6                  |   2 +-
 dev/deps/spark-deps-hadoop-2.7                  |   2 +-
 external/kafka-0-10-assembly/pom.xml            |   4 +-
 external/kafka-0-8-assembly/pom.xml             |   4 +-
 pom.xml                                         |   6 +-
 project/MimaExcludes.scala                      |   5 +-
 9 files changed, 20 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index bc6b1c4..431967e 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -190,8 +190,8 @@
       <artifactId>snappy-java</artifactId>
     </dependency>
     <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
     </dependency>
     <dependency>
       <groupId>org.roaringbitmap</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java
deleted file mode 100644
index 9d6f06e..0000000
--- a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.io;
-
-import java.io.EOFException;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.zip.Checksum;
-
-import net.jpountz.lz4.LZ4Exception;
-import net.jpountz.lz4.LZ4Factory;
-import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.util.SafeUtils;
-import net.jpountz.xxhash.XXHashFactory;
-
-/**
- * {@link InputStream} implementation to decode data written with
- * {@link net.jpountz.lz4.LZ4BlockOutputStream}. This class is not thread-safe and does not
- * support {@link #mark(int)}/{@link #reset()}.
- * @see net.jpountz.lz4.LZ4BlockOutputStream
- *
- * This is based on net.jpountz.lz4.LZ4BlockInputStream
- *
- * changes: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411
- *
- * TODO: merge this into upstream
- */
-public final class LZ4BlockInputStream extends FilterInputStream {
-
-  // Copied from net.jpountz.lz4.LZ4BlockOutputStream
-  static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' };
-  static final int MAGIC_LENGTH = MAGIC.length;
-
-  static final int HEADER_LENGTH =
-    MAGIC_LENGTH // magic bytes
-      + 1          // token
-      + 4          // compressed length
-      + 4          // decompressed length
-      + 4;         // checksum
-
-  static final int COMPRESSION_LEVEL_BASE = 10;
-
-  static final int COMPRESSION_METHOD_RAW = 0x10;
-  static final int COMPRESSION_METHOD_LZ4 = 0x20;
-
-  static final int DEFAULT_SEED = 0x9747b28c;
-
-  private final LZ4FastDecompressor decompressor;
-  private final Checksum checksum;
-  private byte[] buffer;
-  private byte[] compressedBuffer;
-  private int originalLen;
-  private int o;
-  private boolean finished;
-
-  /**
-   * Create a new {@link InputStream}.
-   *
-   * @param in            the {@link InputStream} to poll
-   * @param decompressor  the {@link LZ4FastDecompressor decompressor} instance to
-   *                      use
-   * @param checksum      the {@link Checksum} instance to use, must be
-   *                      equivalent to the instance which has been used to
-   *                      write the stream
-   */
-  public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
-    super(in);
-    this.decompressor = decompressor;
-    this.checksum = checksum;
-    this.buffer = new byte[0];
-    this.compressedBuffer = new byte[HEADER_LENGTH];
-    o = originalLen = 0;
-    finished = false;
-  }
-
-  /**
-   * Create a new instance using {@link net.jpountz.xxhash.XXHash32} for checksuming.
-   * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum)
-   * @see net.jpountz.xxhash.StreamingXXHash32#asChecksum()
-   */
-  public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
-    this(in, decompressor,
-      XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum());
-  }
-
-  /**
-   * Create a new instance which uses the fastest {@link LZ4FastDecompressor} available.
-   * @see LZ4Factory#fastestInstance()
-   * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
-   */
-  public LZ4BlockInputStream(InputStream in) {
-    this(in, LZ4Factory.fastestInstance().fastDecompressor());
-  }
-
-  @Override
-  public int available() throws IOException {
-    refill();
-    return originalLen - o;
-  }
-
-  @Override
-  public int read() throws IOException {
-    refill();
-    if (finished) {
-      return -1;
-    }
-    return buffer[o++] & 0xFF;
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) throws IOException {
-    SafeUtils.checkRange(b, off, len);
-    refill();
-    if (finished) {
-      return -1;
-    }
-    len = Math.min(len, originalLen - o);
-    System.arraycopy(buffer, o, b, off, len);
-    o += len;
-    return len;
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    refill();
-    if (finished) {
-      return -1;
-    }
-    final int skipped = (int) Math.min(n, originalLen - o);
-    o += skipped;
-    return skipped;
-  }
-
-  private void refill() throws IOException {
-    if (finished || o < originalLen) {
-      return;
-    }
-    try {
-      readFully(compressedBuffer, HEADER_LENGTH);
-    } catch (EOFException e) {
-      finished = true;
-      return;
-    }
-    for (int i = 0; i < MAGIC_LENGTH; ++i) {
-      if (compressedBuffer[i] != MAGIC[i]) {
-        throw new IOException("Stream is corrupted");
-      }
-    }
-    final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF;
-    final int compressionMethod = token & 0xF0;
-    final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F);
-    if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4)
-    {
-      throw new IOException("Stream is corrupted");
-    }
-    final int compressedLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1);
-    originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5);
-    final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
-    assert HEADER_LENGTH == MAGIC_LENGTH + 13;
-    if (originalLen > 1 << compressionLevel
-      || originalLen < 0
-      || compressedLen < 0
-      || (originalLen == 0 && compressedLen != 0)
-      || (originalLen != 0 && compressedLen == 0)
-      || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
-      throw new IOException("Stream is corrupted");
-    }
-    if (originalLen == 0 && compressedLen == 0) {
-      if (check != 0) {
-        throw new IOException("Stream is corrupted");
-      }
-      refill();
-      return;
-    }
-    if (buffer.length < originalLen) {
-      buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
-    }
-    switch (compressionMethod) {
-      case COMPRESSION_METHOD_RAW:
-        readFully(buffer, originalLen);
-        break;
-      case COMPRESSION_METHOD_LZ4:
-        if (compressedBuffer.length < originalLen) {
-          compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
-        }
-        readFully(compressedBuffer, compressedLen);
-        try {
-          final int compressedLen2 =
-            decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
-          if (compressedLen != compressedLen2) {
-            throw new IOException("Stream is corrupted");
-          }
-        } catch (LZ4Exception e) {
-          throw new IOException("Stream is corrupted", e);
-        }
-        break;
-      default:
-        throw new AssertionError();
-    }
-    checksum.reset();
-    checksum.update(buffer, 0, originalLen);
-    if ((int) checksum.getValue() != check) {
-      throw new IOException("Stream is corrupted");
-    }
-    o = 0;
-  }
-
-  private void readFully(byte[] b, int len) throws IOException {
-    int read = 0;
-    while (read < len) {
-      final int r = in.read(b, read, len - read);
-      if (r < 0) {
-        throw new EOFException("Stream ended prematurely");
-      }
-      read += r;
-    }
-    assert len == read;
-  }
-
-  @Override
-  public boolean markSupported() {
-    return false;
-  }
-
-  @SuppressWarnings("sync-override")
-  @Override
-  public void mark(int readlimit) {
-    // unsupported
-  }
-
-  @SuppressWarnings("sync-override")
-  @Override
-  public void reset() throws IOException {
-    throw new IOException("mark/reset not supported");
-  }
-
-  @Override
-  public String toString() {
-    return getClass().getSimpleName() + "(in=" + in
-      + ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0cb16f0..27f2e42 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.util.Locale
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-import net.jpountz.lz4.LZ4BlockOutputStream
+import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
 import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
 
 import org.apache.spark.SparkConf
@@ -115,7 +115,10 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
     new LZ4BlockOutputStream(s, blockSize)
   }
 
-  override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)
+  override def compressedInputStream(s: InputStream): InputStream = {
+    val disableConcatenationOfByteStream = false
+    new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/dev/deps/spark-deps-hadoop-2.6
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index d7587fb..83070a9 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -133,7 +133,7 @@ leveldbjni-all-1.8.jar
 libfb303-0.9.3.jar
 libthrift-0.9.3.jar
 log4j-1.2.17.jar
-lz4-1.3.0.jar
+lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/dev/deps/spark-deps-hadoop-2.7
----------------------------------------------------------------------
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 887eeca..5481e25 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -134,7 +134,7 @@ leveldbjni-all-1.8.jar
 libfb303-0.9.3.jar
 libthrift-0.9.3.jar
 log4j-1.2.17.jar
-lz4-1.3.0.jar
+lz4-java-1.4.0.jar
 machinist_2.11-0.6.1.jar
 macro-compat_2.11-1.1.1.jar
 mail-1.4.7.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/external/kafka-0-10-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml
index 75df886..d6f9731 100644
--- a/external/kafka-0-10-assembly/pom.xml
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -65,8 +65,8 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/external/kafka-0-8-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml
index f9c2dcb..7863494 100644
--- a/external/kafka-0-8-assembly/pom.xml
+++ b/external/kafka-0-8-assembly/pom.xml
@@ -65,8 +65,8 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>net.jpountz.lz4</groupId>
-      <artifactId>lz4</artifactId>
+      <groupId>org.lz4</groupId>
+      <artifactId>lz4-java</artifactId>
       <scope>provided</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 500fa1c..9616f6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -531,9 +531,9 @@
         <scope>${hadoop.deps.scope}</scope>
       </dependency>
       <dependency>
-        <groupId>net.jpountz.lz4</groupId>
-        <artifactId>lz4</artifactId>
-        <version>1.3.0</version>
+        <groupId>org.lz4</groupId>
+        <artifactId>lz4-java</artifactId>
+        <version>1.4.0</version>
       </dependency>
       <dependency>
         <groupId>com.clearspring.analytics</groupId>

http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1793da0..7ba85bd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -41,7 +41,10 @@ object MimaExcludes {
 
     // [SPARK-19937] Add remote bytes read to disk.
     ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"),
-    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this")
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"),
+
+    // [SPARK-21276] Update lz4-java to the latest (v1.4.0)
+    ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream")
   )
 
   // Exclude rules for 2.2.x


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org