You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/17 18:33:41 UTC
spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around
snappy-java bug
Repository: spark
Updated Branches:
refs/heads/master 50217667c -> f2cc6b5bc
[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug
This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent and to guard against write-after-`close()` bugs. This is a workaround for https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent `close()` method can lead to stream corruption. We can remove this workaround if we upgrade to a snappy-java version that contains my fix for this bug, but in the meantime this patch offers a backportable Spark fix.
Author: Josh Rosen <jo...@databricks.com>
Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following commits:
8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cc6b5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cc6b5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cc6b5b
Branch: refs/heads/master
Commit: f2cc6b5bccc3a70fd7d69183b1a068800831fe19
Parents: 5021766
Author: Josh Rosen <jo...@databricks.com>
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sun May 17 09:30:49 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/io/CompressionCodec.scala | 49 +++++++++++++++++++-
.../unsafe/UnsafeShuffleWriterSuite.java | 8 ----
2 files changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..0d8ac1f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
package org.apache.spark.io
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
- new SnappyOutputStream(s, blockSize)
+ new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
}
override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
}
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we upgrade to a version
+ * of snappy-java that contains the fix for https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends OutputStream {
+
+ private[this] var closed: Boolean = false
+
+ override def write(b: Int): Unit = {
+ if (closed) {
+ throw new IOException("Stream is closed")
+ }
+ os.write(b)
+ }
+
+ override def write(b: Array[Byte]): Unit = {
+ if (closed) {
+ throw new IOException("Stream is closed")
+ }
+ os.write(b)
+ }
+
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+ if (closed) {
+ throw new IOException("Stream is closed")
+ }
+ os.write(b, off, len)
+ }
+
+ override def flush(): Unit = {
+ if (closed) {
+ throw new IOException("Stream is closed")
+ }
+ os.flush()
+ }
+
+ override def close(): Unit = {
+ if (!closed) {
+ closed = true
+ os.close()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e5264..730d265 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
@@ -97,13 +96,6 @@ public class UnsafeShuffleWriterSuite {
@After
public void tearDown() {
Utils.deleteRecursively(tempDir);
- // This call is a workaround for SPARK-7660, a snappy-java bug which is exposed by this test
- // suite. Clearing the cached buffer allocator's pool of reusable buffers masks this bug,
- // preventing a test failure in JavaAPISuite that would otherwise occur. The underlying bug
- // needs to be fixed, but in the meantime this workaround avoids spurious Jenkins failures.
- synchronized (CachedBufferAllocator.class) {
- CachedBufferAllocator.queueTable.clear();
- }
final long leakedMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
if (leakedMemory != 0) {
fail("Test leaked " + leakedMemory + " bytes of managed memory");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org