You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/06 22:07:35 UTC

spark git commit: [SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues()

Repository: spark
Updated Branches:
  refs/heads/master 8bbb08a30 -> 29cfab3f1


[SPARK-17110] Fix StreamCorruptionException in BlockManager.getRemoteValues()

## What changes were proposed in this pull request?

This patch fixes a `java.io.StreamCorruptedException` error affecting remote reads of cached values when certain data types are used. The problem stems from #11801 / SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. If PySpark cached a PythonRDD, then this would be cached as an `RDD[Array[Byte]]` and the automatic serializer selection would pick KryoSerializer for replication and block transfer. However, the `getRemoteValues()` / `getRemoteBytes()` code path did not pass proper class tags in order to enable the same serializer to be used during deserialization, causing Java to be inappropriately used instead of Kryo, leading to the StreamCorruptedException.

We already fixed a similar bug in #14311, which dealt with similar issues in block replication. Prior to that patch, it seems that we had no tests to ensure that block replication actually succeeded. Similarly, prior to this bug fix patch it looks like we had no tests to perform remote reads of cached data, which is why this bug was able to remain latent for so long.

This patch addresses the bug by modifying `BlockManager`'s `get()` and  `getRemoteValues()` methods to accept ClassTags, allowing the proper class tag to be threaded in the `getOrElseUpdate` code path (which is used by `rdd.iterator`)

## How was this patch tested?

Extended the caching tests in `DistributedSuite` to exercise the `getRemoteValues` path, plus manual testing to verify that the PySpark bug reproduction in SPARK-17110 is fixed.

Author: Josh Rosen <jo...@databricks.com>

Closes #14952 from JoshRosen/SPARK-17110.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29cfab3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29cfab3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29cfab3f

Branch: refs/heads/master
Commit: 29cfab3f1524c5690be675d24dda0a9a1806d6ff
Parents: 8bbb08a
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Sep 6 15:07:28 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Sep 6 15:07:28 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/BlockRDD.scala   |  2 +-
 .../apache/spark/serializer/SerializerManager.scala  |  7 ++++---
 .../org/apache/spark/storage/BlockManager.scala      | 15 ++++++++-------
 .../scala/org/apache/spark/DistributedSuite.scala    |  6 ++++--
 .../streaming/rdd/WriteAheadLogBackedBlockRDD.scala  |  5 +++--
 .../spark/streaming/ReceivedBlockHandlerSuite.scala  |  3 ++-
 6 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 63d1d17..d47b755 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -44,7 +44,7 @@ class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[Blo
     assertValid()
     val blockManager = SparkEnv.get.blockManager
     val blockId = split.asInstanceOf[BlockRDDPartition].blockId
-    blockManager.get(blockId) match {
+    blockManager.get[T](blockId) match {
       case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
         throw new Exception("Could not compute split, block " + blockId + " not found")

http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 7b1ec6f..2156d57 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -180,11 +180,12 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
    * Deserializes an InputStream into an iterator of values and disposes of it when the end of
    * the iterator is reached.
    */
-  def dataDeserializeStream[T: ClassTag](
+  def dataDeserializeStream[T](
       blockId: BlockId,
-      inputStream: InputStream): Iterator[T] = {
+      inputStream: InputStream)
+      (classTag: ClassTag[T]): Iterator[T] = {
     val stream = new BufferedInputStream(inputStream)
-    getSerializer(implicitly[ClassTag[T]])
+    getSerializer(classTag)
       .newInstance()
       .deserializeStream(wrapStream(blockId, stream))
       .asIterator.asInstanceOf[Iterator[T]]

http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c72f28e..0614646 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -520,10 +520,11 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+  private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
+    val ct = implicitly[ClassTag[T]]
     getRemoteBytes(blockId).map { data =>
       val values =
-        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
+        serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
       new BlockResult(values, DataReadMethod.Network, data.size)
     }
   }
@@ -602,13 +603,13 @@ private[spark] class BlockManager(
    * any locks if the block was fetched from a remote block manager. The read lock will
    * automatically be freed once the result's `data` iterator is fully consumed.
    */
-  def get(blockId: BlockId): Option[BlockResult] = {
+  def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
     val local = getLocalValues(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
       return local
     }
-    val remote = getRemoteValues(blockId)
+    val remote = getRemoteValues[T](blockId)
     if (remote.isDefined) {
       logInfo(s"Found block $blockId remotely")
       return remote
@@ -660,7 +661,7 @@ private[spark] class BlockManager(
       makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
     // Attempt to read the block from local or remote storage. If it's present, then we don't need
     // to go through the local-get-or-put path.
-    get(blockId) match {
+    get[T](blockId)(classTag) match {
       case Some(block) =>
         return Left(block)
       case _ =>
@@ -1204,8 +1205,8 @@ private[spark] class BlockManager(
   /**
    * Read a block consisting of a single object.
    */
-  def getSingle(blockId: BlockId): Option[Any] = {
-    get(blockId).map(_.data.next())
+  def getSingle[T: ClassTag](blockId: BlockId): Option[T] = {
+    get[T](blockId).map(_.data.next().asInstanceOf[T])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 4ee0e00..4e36adc 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -170,10 +170,12 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
     blockManager.master.getLocations(blockId).foreach { cmId =>
       val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
         blockId.toString)
-      val deserialized = serializerManager.dataDeserializeStream[Int](blockId,
-        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream()).toList
+      val deserialized = serializerManager.dataDeserializeStream(blockId,
+        new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream())(data.elementClassTag).toList
       assert(deserialized === (1 to 100).toList)
     }
+    // This will exercise the getRemoteBytes / getRemoteValues code paths:
+    assert(blockIds.flatMap(id => blockManager.get[Int](id).get.data).toSet === (1 to 1000).toSet)
   }
 
   Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index 53fccd8..0b2ec29 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -120,7 +120,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
     val blockId = partition.blockId
 
     def getBlockFromBlockManager(): Option[Iterator[T]] = {
-      blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
+      blockManager.get[T](blockId).map(_.data.asInstanceOf[Iterator[T]])
     }
 
     def getBlockFromWriteAheadLog(): Iterator[T] = {
@@ -163,7 +163,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
         dataRead.rewind()
       }
       serializerManager
-        .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream())
+        .dataDeserializeStream(
+          blockId, new ChunkedByteBuffer(dataRead).toInputStream())(elementClassTag)
         .asInstanceOf[Iterator[T]]
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29cfab3f/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index feb5c30..7e66545 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.{BeforeAndAfter, Matchers}
@@ -163,7 +164,7 @@ class ReceivedBlockHandlerSuite
           val bytes = reader.read(fileSegment)
           reader.close()
           serializerManager.dataDeserializeStream(
-            generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList
+            generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream())(ClassTag.Any).toList
         }
         loggedData shouldEqual data
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org