You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/03/25 01:33:54 UTC

spark git commit: [SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore

Repository: spark
Updated Branches:
  refs/heads/master 2cf46d5a9 -> fdd460f5f


[SPARK-13980] Incrementally serialize blocks while unrolling them in MemoryStore

When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements.

Instead, I think that we should incrementally serialize blocks while unrolling them.

A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case.

Author: Josh Rosen <jo...@databricks.com>

Closes #11791 from JoshRosen/serialize-incrementally.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdd460f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdd460f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdd460f5

Branch: refs/heads/master
Commit: fdd460f5f47e4023d81d5a3d918bd4a16ecbb580
Parents: 2cf46d5
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Mar 24 17:33:21 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Mar 24 17:33:21 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala |  49 ++--
 .../spark/storage/memory/MemoryStore.scala      | 229 +++++++++++++++++--
 .../spark/util/ByteBufferOutputStream.scala     |   2 +
 .../util/io/ByteArrayChunkOutputStream.scala    |  11 +-
 .../spark/storage/BlockManagerSuite.scala       |   9 +-
 .../apache/spark/storage/MemoryStoreSuite.scala | 169 +++++++++++---
 6 files changed, 392 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index eebb43e..30d2e23 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -746,7 +746,7 @@ private[spark] class BlockManager(
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
           val values = serializerManager.dataDeserialize(blockId, bytes)(classTag)
-          memoryStore.putIterator(blockId, values, level, classTag) match {
+          memoryStore.putIteratorAsValues(blockId, values, classTag) match {
             case Right(_) => true
             case Left(iter) =>
               // If putting deserialized values in memory failed, we will put the bytes directly to
@@ -876,21 +876,40 @@ private[spark] class BlockManager(
       if (level.useMemory) {
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
-        memoryStore.putIterator(blockId, iterator(), level, classTag) match {
-          case Right(s) =>
-            size = s
-          case Left(iter) =>
-            // Not enough space to unroll this block; drop to disk if applicable
-            if (level.useDisk) {
-              logWarning(s"Persisting block $blockId to disk instead.")
-              diskStore.put(blockId) { fileOutputStream =>
-                serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
+        if (level.deserialized) {
+          memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
+            case Right(s) =>
+              size = s
+            case Left(iter) =>
+              // Not enough space to unroll this block; drop to disk if applicable
+              if (level.useDisk) {
+                logWarning(s"Persisting block $blockId to disk instead.")
+                diskStore.put(blockId) { fileOutputStream =>
+                  serializerManager.dataSerializeStream(blockId, fileOutputStream, iter)(classTag)
+                }
+                size = diskStore.getSize(blockId)
+              } else {
+                iteratorFromFailedMemoryStorePut = Some(iter)
               }
-              size = diskStore.getSize(blockId)
-            } else {
-              iteratorFromFailedMemoryStorePut = Some(iter)
-            }
+          }
+        } else { // !level.deserialized
+          memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
+            case Right(s) =>
+              size = s
+            case Left(partiallySerializedValues) =>
+              // Not enough space to unroll this block; drop to disk if applicable
+              if (level.useDisk) {
+                logWarning(s"Persisting block $blockId to disk instead.")
+                diskStore.put(blockId) { fileOutputStream =>
+                  partiallySerializedValues.finishWritingToStream(fileOutputStream)
+                }
+                size = diskStore.getSize(blockId)
+              } else {
+                iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
+              }
+          }
         }
+
       } else if (level.useDisk) {
         diskStore.put(blockId) { fileOutputStream =>
           serializerManager.dataSerializeStream(blockId, fileOutputStream, iterator())(classTag)
@@ -991,7 +1010,7 @@ private[spark] class BlockManager(
           // Note: if we had a means to discard the disk iterator, we would do that here.
           memoryStore.getValues(blockId).get
         } else {
-          memoryStore.putIterator(blockId, diskIterator, level, classTag) match {
+          memoryStore.putIteratorAsValues(blockId, diskIterator, classTag) match {
             case Left(iter) =>
               // The memory store put() failed, so it returned the iterator back to us:
               iter

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 90016cb..1a78c9c 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -17,20 +17,24 @@
 
 package org.apache.spark.storage.memory
 
+import java.io.OutputStream
+import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
+import com.google.common.io.ByteStreams
+
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.MemoryManager
-import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
 import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
-import org.apache.spark.util.io.ChunkedByteBuffer
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
 
 private sealed trait MemoryEntry[T] {
   def size: Long
@@ -42,8 +46,9 @@ private case class DeserializedMemoryEntry[T](
     classTag: ClassTag[T]) extends MemoryEntry[T]
 private case class SerializedMemoryEntry[T](
     buffer: ChunkedByteBuffer,
-    size: Long,
-    classTag: ClassTag[T]) extends MemoryEntry[T]
+    classTag: ClassTag[T]) extends MemoryEntry[T] {
+  def size: Long = buffer.size
+}
 
 private[storage] trait BlockEvictionHandler {
   /**
@@ -132,7 +137,7 @@ private[spark] class MemoryStore(
       // We acquired enough memory for the block, so go ahead and put it
       val bytes = _bytes()
       assert(bytes.size == size)
-      val entry = new SerializedMemoryEntry[T](bytes, size, implicitly[ClassTag[T]])
+      val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
       entries.synchronized {
         entries.put(blockId, entry)
       }
@@ -145,7 +150,7 @@ private[spark] class MemoryStore(
   }
 
   /**
-   * Attempt to put the given block in memory store.
+   * Attempt to put the given block in memory store as values.
    *
    * It's possible that the iterator is too large to materialize and store in memory. To avoid
    * OOM exceptions, this method will gradually unroll the iterator while periodically checking
@@ -160,10 +165,9 @@ private[spark] class MemoryStore(
    *         iterator or call `close()` on it in order to free the storage memory consumed by the
    *         partially-unrolled block.
    */
-  private[storage] def putIterator[T](
+  private[storage] def putIteratorAsValues[T](
       blockId: BlockId,
       values: Iterator[T],
-      level: StorageLevel,
       classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
 
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
@@ -218,12 +222,8 @@ private[spark] class MemoryStore(
       // We successfully unrolled the entirety of this block
       val arrayValues = vector.toArray
       vector = null
-      val entry = if (level.deserialized) {
+      val entry =
         new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
-      } else {
-        val bytes = serializerManager.dataSerialize(blockId, arrayValues.iterator)(classTag)
-        new SerializedMemoryEntry[T](bytes, bytes.size, classTag)
-      }
       val size = entry.size
       def transferUnrollToStorage(amount: Long): Unit = {
         // Synchronize so that transfer is atomic
@@ -255,12 +255,8 @@ private[spark] class MemoryStore(
         entries.synchronized {
           entries.put(blockId, entry)
         }
-        val bytesOrValues = if (level.deserialized) "values" else "bytes"
-        logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
-          blockId,
-          bytesOrValues,
-          Utils.bytesToString(size),
-          Utils.bytesToString(maxMemory - blocksMemoryUsed)))
+        logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
+          blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
         Right(size)
       } else {
         assert(currentUnrollMemoryForThisTask >= currentUnrollMemoryForThisTask,
@@ -279,13 +275,117 @@ private[spark] class MemoryStore(
     }
   }
 
+  /**
+   * Attempt to put the given block in memory store as bytes.
+   *
+   * It's possible that the iterator is too large to materialize and store in memory. To avoid
+   * OOM exceptions, this method will gradually unroll the iterator while periodically checking
+   * whether there is enough free memory. If the block is successfully materialized, then the
+   * temporary unroll memory used during the materialization is "transferred" to storage memory,
+   * so we won't acquire more memory than is actually needed to store the block.
+   *
+   * @return in case of success, the estimated the estimated size of the stored data. In case of
+   *         failure, return a handle which allows the caller to either finish the serialization
+   *         by spilling to disk or to deserialize the partially-serialized block and reconstruct
+   *         the original input iterator. The caller must either fully consume this result
+   *         iterator or call `discard()` on it in order to free the storage memory consumed by the
+   *         partially-unrolled block.
+   */
+  private[storage] def putIteratorAsBytes[T](
+      blockId: BlockId,
+      values: Iterator[T],
+      classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+
+    // Whether there is still enough memory for us to continue unrolling this block
+    var keepUnrolling = true
+    // Initial per-task memory to request for unrolling blocks (bytes).
+    val initialMemoryThreshold = unrollMemoryThreshold
+    // Keep track of unroll memory used by this particular block / putIterator() operation
+    var unrollMemoryUsedByThisBlock = 0L
+    // Underlying buffer for unrolling the block
+    val redirectableStream = new RedirectableOutputStream
+    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
+    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+    val serializationStream: SerializationStream = {
+      val ser = serializerManager.getSerializer(classTag).newInstance()
+      ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
+    }
+
+    // Request enough memory to begin unrolling
+    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+
+    if (!keepUnrolling) {
+      logWarning(s"Failed to reserve initial memory threshold of " +
+        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+    } else {
+      unrollMemoryUsedByThisBlock += initialMemoryThreshold
+    }
+
+    def reserveAdditionalMemoryIfNecessary(): Unit = {
+      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
+        val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
+        keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+        if (keepUnrolling) {
+          unrollMemoryUsedByThisBlock += amountToRequest
+        }
+      }
+    }
+
+    // Unroll this block safely, checking whether we have exceeded our threshold
+    while (values.hasNext && keepUnrolling) {
+      serializationStream.writeObject(values.next())(classTag)
+      reserveAdditionalMemoryIfNecessary()
+    }
+
+    // Make sure that we have enough memory to store the block. By this point, it is possible that
+    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
+    // perform one final call to attempt to allocate additional memory if necessary.
+    if (keepUnrolling) {
+      serializationStream.close()
+      reserveAdditionalMemoryIfNecessary()
+    }
+
+    if (keepUnrolling) {
+      val entry = SerializedMemoryEntry[T](
+        new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag)
+      // Synchronize so that transfer is atomic
+      memoryManager.synchronized {
+        releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
+        val success = memoryManager.acquireStorageMemory(blockId, entry.size)
+        assert(success, "transferring unroll memory to storage memory failed")
+      }
+      entries.synchronized {
+        entries.put(blockId, entry)
+      }
+      logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
+        blockId, Utils.bytesToString(entry.size), Utils.bytesToString(blocksMemoryUsed)))
+      Right(entry.size)
+    } else {
+      // We ran out of space while unrolling the values for this block
+      logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
+      Left(
+        new PartiallySerializedBlock(
+          this,
+          serializerManager,
+          blockId,
+          serializationStream,
+          redirectableStream,
+          unrollMemoryUsedByThisBlock,
+          new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
+          values,
+          classTag))
+    }
+  }
+
   def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     val entry = entries.synchronized { entries.get(blockId) }
     entry match {
       case null => None
       case e: DeserializedMemoryEntry[_] =>
         throw new IllegalArgumentException("should only call getBytes on serialized blocks")
-      case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
+      case SerializedMemoryEntry(bytes, _) => Some(bytes)
     }
   }
 
@@ -373,7 +473,7 @@ private[spark] class MemoryStore(
       def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
         val data = entry match {
           case DeserializedMemoryEntry(values, _, _) => Left(values)
-          case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
+          case SerializedMemoryEntry(buffer, _) => Right(buffer)
         }
         val newEffectiveStorageLevel =
           blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
@@ -507,12 +607,13 @@ private[spark] class MemoryStore(
 }
 
 /**
- * The result of a failed [[MemoryStore.putIterator()]] call.
+ * The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
  *
- * @param memoryStore the memoryStore, used for freeing memory.
+ * @param memoryStore  the memoryStore, used for freeing memory.
  * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
- * @param unrolled an iterator for the partially-unrolled values.
- * @param rest the rest of the original iterator passed to [[MemoryStore.putIterator()]].
+ * @param unrolled     an iterator for the partially-unrolled values.
+ * @param rest         the rest of the original iterator passed to
+ *                     [[MemoryStore.putIteratorAsValues()]].
  */
 private[storage] class PartiallyUnrolledIterator[T](
     memoryStore: MemoryStore,
@@ -544,3 +645,81 @@ private[storage] class PartiallyUnrolledIterator[T](
     iter = null
   }
 }
+
+/**
+ * A wrapper which allows an open [[OutputStream]] to be redirected to a different sink.
+ */
+private class RedirectableOutputStream extends OutputStream {
+  private[this] var os: OutputStream = _
+  def setOutputStream(s: OutputStream): Unit = { os = s }
+  override def write(b: Int): Unit = os.write(b)
+  override def write(b: Array[Byte]): Unit = os.write(b)
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = os.write(b, off, len)
+  override def flush(): Unit = os.flush()
+  override def close(): Unit = os.close()
+}
+
+/**
+ * The result of a failed [[MemoryStore.putIteratorAsBytes()]] call.
+ *
+ * @param memoryStore the MemoryStore, used for freeing memory.
+ * @param serializerManager the SerializerManager, used for deserializing values.
+ * @param blockId the block id.
+ * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
+ * @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
+ * @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param unrolled a byte buffer containing the partially-serialized values.
+ * @param rest         the rest of the original iterator passed to
+ *                     [[MemoryStore.putIteratorAsValues()]].
+ * @param classTag the [[ClassTag]] for the block.
+ */
+private[storage] class PartiallySerializedBlock[T](
+    memoryStore: MemoryStore,
+    serializerManager: SerializerManager,
+    blockId: BlockId,
+    serializationStream: SerializationStream,
+    redirectableOutputStream: RedirectableOutputStream,
+    unrollMemory: Long,
+    unrolled: ChunkedByteBuffer,
+    rest: Iterator[T],
+    classTag: ClassTag[T]) {
+
+  /**
+   * Called to dispose of this block and free its memory.
+   */
+  def discard(): Unit = {
+    try {
+      serializationStream.close()
+    } finally {
+      memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+    }
+  }
+
+  /**
+   * Finish writing this block to the given output stream by first writing the serialized values
+   * and then serializing the values from the original input iterator.
+   */
+  def finishWritingToStream(os: OutputStream): Unit = {
+    ByteStreams.copy(unrolled.toInputStream(), os)
+    redirectableOutputStream.setOutputStream(os)
+    while (rest.hasNext) {
+      serializationStream.writeObject(rest.next())(classTag)
+    }
+    discard()
+  }
+
+  /**
+   * Returns an iterator over the values in this block by first deserializing the serialized
+   * values and then consuming the rest of the original input iterator.
+   *
+   * If the caller does not plan to fully consume the resulting iterator then they must call
+   * `close()` on it to free its resources.
+   */
+  def valuesIterator: PartiallyUnrolledIterator[T] = {
+    new PartiallyUnrolledIterator(
+      memoryStore,
+      unrollMemory,
+      unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag),
+      rest = rest)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
index 8527e3a..09e7579 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -27,6 +27,8 @@ private[spark] class ByteBufferOutputStream(capacity: Int) extends ByteArrayOutp
 
   def this() = this(32)
 
+  def getCount(): Int = count
+
   def toByteBuffer: ByteBuffer = {
     return ByteBuffer.wrap(buf, 0, count)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
index daac6f9..16fe3be 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala
@@ -30,10 +30,10 @@ import scala.collection.mutable.ArrayBuffer
 private[spark]
 class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
 
-  private val chunks = new ArrayBuffer[Array[Byte]]
+  private[this] val chunks = new ArrayBuffer[Array[Byte]]
 
   /** Index of the last chunk. Starting with -1 when the chunks array is empty. */
-  private var lastChunkIndex = -1
+  private[this] var lastChunkIndex = -1
 
   /**
    * Next position to write in the last chunk.
@@ -41,12 +41,16 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
    * If this equals chunkSize, it means for next write we need to allocate a new chunk.
    * This can also never be 0.
    */
-  private var position = chunkSize
+  private[this] var position = chunkSize
+  private[this] var _size = 0
+
+  def size: Long = _size
 
   override def write(b: Int): Unit = {
     allocateNewChunkIfNeeded()
     chunks(lastChunkIndex)(position) = b.toByte
     position += 1
+    _size += 1
   }
 
   override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
@@ -58,6 +62,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
       written += thisBatch
       position += thisBatch
     }
+    _size += len
   }
 
   @inline

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 94f6f87..7a4cb39 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1035,7 +1035,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   test("safely unroll blocks through putIterator (disk)") {
     store = makeBlockManager(12000)
-    val memAndDisk = StorageLevel.MEMORY_AND_DISK
     val memoryStore = store.memoryStore
     val diskStore = store.diskStore
     val smallList = List.fill(40)(new Array[Byte](100))
@@ -1044,12 +1043,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    store.putIterator("b1", smallIterator, memAndDisk)
-    store.putIterator("b2", smallIterator, memAndDisk)
+    store.putIterator("b1", smallIterator, StorageLevel.MEMORY_AND_DISK)
+    store.putIterator("b2", smallIterator, StorageLevel.MEMORY_AND_DISK)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
     // Memory store should contain b2 and b3, while disk store should contain only b1
-    val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, ClassTag.Any)
+    val result3 = memoryStore.putIteratorAsValues("b3", smallIterator, ClassTag.Any)
     assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
@@ -1065,7 +1064,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     // the block may be stored to disk. During the unrolling process, block "b2" should be kicked
     // out, so the memory store should contain only b3, while the disk store should contain
     // b1, b2 and b4.
-    val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, ClassTag.Any)
+    val result4 = memoryStore.putIteratorAsValues("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft)
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))

http://git-wip-us.apache.org/repos/asf/spark/blob/fdd460f5/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b4ab67c..43e832d 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -29,7 +29,7 @@ import org.scalatest._
 import org.apache.spark._
 import org.apache.spark.memory.StaticMemoryManager
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
-import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallyUnrolledIterator}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
 import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
@@ -47,6 +47,8 @@ class MemoryStoreSuite
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
   val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
 
+  val serializerManager = new SerializerManager(serializer, conf)
+
   // Implicitly convert strings to BlockIds for test clarity.
   implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
   def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
@@ -61,7 +63,6 @@ class MemoryStoreSuite
 
   def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
-    val serializerManager = new SerializerManager(serializer, conf)
     val blockInfoManager = new BlockInfoManager
     val blockEvictionHandler = new BlockEvictionHandler {
       var memoryStore: MemoryStore = _
@@ -121,20 +122,20 @@ class MemoryStoreSuite
     val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    def putIterator[T](
+    def putIteratorAsValues[T](
         blockId: BlockId,
         iter: Iterator[T],
         classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
       assert(blockInfoManager.lockNewBlockForWriting(
         blockId,
         new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
-      val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+      val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
       blockInfoManager.unlock(blockId)
       res
     }
 
     // Unroll with all the space in the world. This should succeed.
-    var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+    var putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
     assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach { case (e, a) =>
@@ -145,9 +146,9 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("unroll")
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
-    assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
-    assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
-    putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+    assert(putIteratorAsValues("someBlock1", smallList.iterator, ct).isRight)
+    assert(putIteratorAsValues("someBlock2", smallList.iterator, ct).isRight)
+    putResult = putIteratorAsValues("unroll", smallList.iterator, ClassTag.Any)
     assert(putResult.isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     assert(memoryStore.contains("someBlock2"))
@@ -162,8 +163,8 @@ class MemoryStoreSuite
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
     // In the meantime, however, we kicked out someBlock2 before giving up.
-    assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
-    putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+    assert(putIteratorAsValues("someBlock3", smallList.iterator, ct).isRight)
+    putResult = putIteratorAsValues("unroll", bigList.iterator, ClassTag.Any)
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
     assert(!memoryStore.contains("someBlock2"))
     assert(putResult.isLeft)
@@ -174,7 +175,7 @@ class MemoryStoreSuite
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
   }
 
-  test("safely unroll blocks through putIterator") {
+  test("safely unroll blocks through putIteratorAsValues") {
     val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
@@ -182,21 +183,21 @@ class MemoryStoreSuite
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    def putIterator[T](
+    def putIteratorAsValues[T](
         blockId: BlockId,
         iter: Iterator[T],
         classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
       assert(blockInfoManager.lockNewBlockForWriting(
         blockId,
         new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster = false)))
-      val res = memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, classTag)
+      val res = memoryStore.putIteratorAsValues(blockId, iter, classTag)
       blockInfoManager.unlock(blockId)
       res
     }
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
-    val result1 = putIterator("b1", smallIterator, ClassTag.Any)
-    val result2 = putIterator("b2", smallIterator, ClassTag.Any)
+    val result1 = putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+    val result2 = putIteratorAsValues("b2", smallIterator, ClassTag.Any)
     assert(memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(result1.isRight) // unroll was successful
@@ -211,11 +212,11 @@ class MemoryStoreSuite
     blockInfoManager.lockForWriting("b2")
     memoryStore.remove("b2")
     blockInfoManager.removeBlock("b2")
-    putIterator("b1", smallIterator, ClassTag.Any)
-    putIterator("b2", smallIterator, ClassTag.Any)
+    putIteratorAsValues("b1", smallIterator, ClassTag.Any)
+    putIteratorAsValues("b2", smallIterator, ClassTag.Any)
 
     // Unroll with not enough space. This should succeed but kick out b1 in the process.
-    val result3 = putIterator("b3", smallIterator, ClassTag.Any)
+    val result3 = putIteratorAsValues("b3", smallIterator, ClassTag.Any)
     assert(result3.isRight)
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
@@ -224,10 +225,10 @@ class MemoryStoreSuite
     blockInfoManager.lockForWriting("b3")
     assert(memoryStore.remove("b3"))
     blockInfoManager.removeBlock("b3")
-    putIterator("b3", smallIterator, ClassTag.Any)
+    putIteratorAsValues("b3", smallIterator, ClassTag.Any)
 
     // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
-    val result4 = putIterator("b4", bigIterator, ClassTag.Any)
+    val result4 = putIteratorAsValues("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
     assert(!memoryStore.contains("b2"))
@@ -238,41 +239,151 @@ class MemoryStoreSuite
     assert(memoryStore.currentUnrollMemoryForThisTask === 0) // close released the unroll memory
   }
 
+  test("safely unroll blocks through putIteratorAsBytes") {
+    val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+    val smallList = List.fill(40)(new Array[Byte](100))
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
+    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+    def putIteratorAsBytes[T](
+        blockId: BlockId,
+        iter: Iterator[T],
+        classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+      assert(blockInfoManager.lockNewBlockForWriting(
+        blockId,
+        new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false)))
+      val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
+      blockInfoManager.unlock(blockId)
+      res
+    }
+
+    // Unroll with plenty of space. This should succeed and cache both blocks.
+    val result1 = putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+    val result2 = putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+    assert(memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(result1.isRight) // unroll was successful
+    assert(result2.isRight)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+    // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+    // would not know how to drop them from memory later.
+    blockInfoManager.lockForWriting("b1")
+    memoryStore.remove("b1")
+    blockInfoManager.removeBlock("b1")
+    blockInfoManager.lockForWriting("b2")
+    memoryStore.remove("b2")
+    blockInfoManager.removeBlock("b2")
+    putIteratorAsBytes("b1", smallIterator, ClassTag.Any)
+    putIteratorAsBytes("b2", smallIterator, ClassTag.Any)
+
+    // Unroll with not enough space. This should succeed but kick out b1 in the process.
+    val result3 = putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+    assert(result3.isRight)
+    assert(!memoryStore.contains("b1"))
+    assert(memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+    blockInfoManager.lockForWriting("b3")
+    assert(memoryStore.remove("b3"))
+    blockInfoManager.removeBlock("b3")
+    putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
+
+    // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+    val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
+    assert(result4.isLeft) // unroll was unsuccessful
+    assert(!memoryStore.contains("b1"))
+    assert(!memoryStore.contains("b2"))
+    assert(memoryStore.contains("b3"))
+    assert(!memoryStore.contains("b4"))
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
+    result4.left.get.discard()
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0) // discard released the unroll memory
+  }
+
+  test("PartiallySerializedBlock.valuesIterator") {
+    val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+    // Unroll huge block with not enough space. This should fail.
+    assert(blockInfoManager.lockNewBlockForWriting(
+      "b1",
+      new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+    blockInfoManager.unlock("b1")
+    assert(res.isLeft)
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+    val valuesReturnedFromFailedPut = res.left.get.valuesIterator.toSeq // force materialization
+    valuesReturnedFromFailedPut.zip(bigList).foreach { case (e, a) =>
+      assert(e === a, "PartiallySerializedBlock.valuesIterator() did not return original values!")
+    }
+    // The unroll memory was freed once the iterator was fully traversed.
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+  }
+
+  test("PartiallySerializedBlock.finishWritingToStream") {
+    val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+    val bigList = List.fill(40)(new Array[Byte](1000))
+    def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
+
+    // Unroll huge block with not enough space. This should fail.
+    assert(blockInfoManager.lockNewBlockForWriting(
+      "b1",
+      new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
+    val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
+    blockInfoManager.unlock("b1")
+    assert(res.isLeft)
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0)
+    val bos = new ByteBufferOutputStream()
+    res.left.get.finishWritingToStream(bos)
+    // The unroll memory was freed once the block was fully written.
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+    val deserializationStream = serializerManager.dataDeserializeStream[Any](
+      "b1", new ByteBufferInputStream(bos.toByteBuffer))(ClassTag.Any)
+    deserializationStream.zip(bigList.iterator).foreach { case (e, a) =>
+      assert(e === a,
+        "PartiallySerializedBlock.finishWritingtoStream() did not write original values!")
+    }
+  }
+
   test("multiple unrolls by the same thread") {
     val (memoryStore, _) = makeMemoryStore(12000)
     val smallList = List.fill(40)(new Array[Byte](100))
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
-    def putIterator(
+    def putIteratorAsValues(
         blockId: BlockId,
         iter: Iterator[Any]): Either[PartiallyUnrolledIterator[Any], Long] = {
-       memoryStore.putIterator(blockId, iter, StorageLevel.MEMORY_ONLY, ClassTag.Any)
+       memoryStore.putIteratorAsValues(blockId, iter, ClassTag.Any)
     }
 
     // All unroll memory used is released because putIterator did not return an iterator
-    assert(putIterator("b1", smallIterator).isRight)
+    assert(putIteratorAsValues("b1", smallIterator).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
-    assert(putIterator("b2", smallIterator).isRight)
+    assert(putIteratorAsValues("b2", smallIterator).isRight)
     assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll memory is not released because putIterator returned an iterator
     // that still depends on the underlying vector used in the process
-    assert(putIterator("b3", smallIterator).isLeft)
+    assert(putIteratorAsValues("b3", smallIterator).isLeft)
     val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after the previous unrolls
-    assert(putIterator("b4", smallIterator).isLeft)
+    assert(putIteratorAsValues("b4", smallIterator).isLeft)
     val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
-    assert(putIterator("b5", smallIterator).isLeft)
+    assert(putIteratorAsValues("b5", smallIterator).isLeft)
     val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
-    assert(putIterator("b6", smallIterator).isLeft)
+    assert(putIteratorAsValues("b6", smallIterator).isLeft)
     val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
-    assert(putIterator("b7", smallIterator).isLeft)
+    assert(putIteratorAsValues("b7", smallIterator).isLeft)
     val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org