You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/17 18:33:41 UTC

spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

Repository: spark
Updated Branches:
  refs/heads/master 50217667c -> f2cc6b5bc


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen <jo...@databricks.com>

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cc6b5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cc6b5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cc6b5b

Branch: refs/heads/master
Commit: f2cc6b5bccc3a70fd7d69183b1a068800831fe19
Parents: 5021766
Author: Josh Rosen <jo...@databricks.com>
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sun May 17 09:30:49 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++++++++++++++++++-
 .../unsafe/UnsafeShuffleWriterSuite.java        |  8 ----
 2 files changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..0d8ac1f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-    new SnappyOutputStream(s, blockSize)
+    new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
+ * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+    if (closed) {
+      throw new IOException("Stream is closed")
+    }
+    os.flush()
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      os.close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e5264..730d265 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
@@ -97,13 +96,6 @@ public class UnsafeShuffleWriterSuite {
   @After
   public void tearDown() {
     Utils.deleteRecursively(tempDir);
-    // This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
-    // suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
-    // preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
-    // needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
-    synchronized (CachedBufferAllocator.class) {
-      CachedBufferAllocator.queueTable.clear();
-    }
     final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
     if (leakedMemory != 0) {
       fail("Test leaked " + leakedMemory + " bytes of managed memory");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org