You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/03/18 04:01:37 UTC

spark git commit: [SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore

Repository: spark
Updated Branches:
  refs/heads/master 6037ed0a1 -> 6c2d894a2


[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore

This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.

This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.

This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).

Author: Josh Rosen <jo...@databricks.com>

Closes #11748 from JoshRosen/chunked-block-serialization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c2d894a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c2d894a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c2d894a

Branch: refs/heads/master
Commit: 6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109
Parents: 6037ed0
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Mar 17 20:00:56 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Mar 17 20:00:56 2016 -0700

----------------------------------------------------------------------
 .../network/buffer/NettyManagedBuffer.java      |   2 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  11 +-
 .../org/apache/spark/executor/Executor.scala    |   5 +-
 .../spark/scheduler/TaskResultGetter.scala      |   2 +-
 .../org/apache/spark/storage/BlockManager.scala | 104 +++++----
 .../storage/BlockManagerManagedBuffer.scala     |   9 +-
 .../org/apache/spark/storage/DiskStore.scala    |  16 +-
 .../org/apache/spark/storage/StorageUtils.scala |  22 +-
 .../spark/storage/memory/MemoryStore.scala      |  58 +++--
 .../spark/util/ByteBufferInputStream.scala      |   8 +-
 .../spark/util/io/ChunkedByteBuffer.scala       | 214 +++++++++++++++++++
 .../spark/io/ChunkedByteBufferSuite.scala       |  93 ++++++++
 .../spark/storage/BlockManagerSuite.scala       |  19 +-
 .../apache/spark/storage/DiskStoreSuite.scala   |  14 +-
 .../rdd/WriteAheadLogBackedBlockRDD.scala       |   5 +-
 .../receiver/ReceivedBlockHandler.scala         |  15 +-
 .../streaming/ReceivedBlockHandlerSuite.scala   |   2 +-
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  |   2 +-
 18 files changed, 463 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
index 4c8802a..acc49d9 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.ByteBufInputStream;
 /**
  * A {@link ManagedBuffer} backed by a Netty {@link ByteBuf}.
  */
-public final class NettyManagedBuffer extends ManagedBuffer {
+public class NettyManagedBuffer extends ManagedBuffer {
   private final ByteBuf buf;
 
   public NettyManagedBuffer(ByteBuf buf) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 2634d88..e5e6a9e 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
 import org.apache.spark.util.{ByteBufferInputStream, Utils}
-import org.apache.spark.util.io.ByteArrayChunkOutputStream
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
 
 /**
  * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
@@ -107,7 +107,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
       TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
     blocks.zipWithIndex.foreach { case (block, i) =>
       val pieceId = BroadcastBlockId(id, "piece" + i)
-      if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
+      val bytes = new ChunkedByteBuffer(block.duplicate())
+      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
         throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
       }
     }
@@ -115,10 +116,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
   }
 
   /** Fetch torrent blocks from the driver and/or other executors. */
-  private def readBlocks(): Array[ByteBuffer] = {
+  private def readBlocks(): Array[ChunkedByteBuffer] = {
     // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
     // to the driver, so other executors can pull these chunks from this executor as well.
-    val blocks = new Array[ByteBuffer](numBlocks)
+    val blocks = new Array[ChunkedByteBuffer](numBlocks)
     val bm = SparkEnv.get.blockManager
 
     for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
@@ -182,7 +183,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
         case None =>
           logInfo("Started reading broadcast variable " + id)
           val startTimeMs = System.currentTimeMillis()
-          val blocks = readBlocks()
+          val blocks = readBlocks().flatMap(_.getChunks())
           logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
 
           val obj = TorrentBroadcast.unBlockifyObject[T](

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 6327d55..3201463 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -36,6 +36,7 @@ import org.apache.spark.scheduler.{AccumulableInfo, DirectTaskResult, IndirectTa
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Spark executor, backed by a threadpool to run tasks.
@@ -297,7 +298,9 @@ private[spark] class Executor(
           } else if (resultSize > maxDirectResultSize) {
             val blockId = TaskResultBlockId(taskId)
             env.blockManager.putBytes(
-              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+              blockId,
+              new ChunkedByteBuffer(serializedDirectResult.duplicate()),
+              StorageLevel.MEMORY_AND_DISK_SER)
             logInfo(
               s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
             ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
index 7eb6d53..873f1b5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala
@@ -83,7 +83,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
                 return
               }
               val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
-                serializedTaskResult.get)
+                serializedTaskResult.get.toByteBuffer)
               sparkEnv.blockManager.master.removeBlock(blockId)
               (deserializedResult, size)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3bbdf48..aa2561d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.storage
 
 import java.io._
-import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.nio.ByteBuffer
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -26,15 +26,13 @@ import scala.concurrent.duration._
 import scala.util.Random
 import scala.util.control.NonFatal
 
-import sun.nio.ch.DirectBuffer
-
 import org.apache.spark._
 import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.memory.MemoryManager
 import org.apache.spark.network._
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.ExternalShuffleClient
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
@@ -43,6 +41,7 @@ import org.apache.spark.serializer.{Serializer, SerializerInstance}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
 import org.apache.spark.util._
+import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
 
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
@@ -296,7 +295,7 @@ private[spark] class BlockManager(
    * Put the block locally, using the given storage level.
    */
   override def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean = {
-    putBytes(blockId, data.nioByteBuffer(), level)
+    putBytes(blockId, new ChunkedByteBuffer(data.nioByteBuffer()), level)
   }
 
   /**
@@ -444,7 +443,7 @@ private[spark] class BlockManager(
   /**
    * Get block from the local block manager as serialized bytes.
    */
-  def getLocalBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getLocalBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     logDebug(s"Getting local block $blockId as bytes")
     // As an optimization for map output fetches, if the block is for a shuffle, return it
     // without acquiring a lock; the disk store never deletes (recent) items so this should work
@@ -453,7 +452,8 @@ private[spark] class BlockManager(
       // TODO: This should gracefully handle case where local block is not available. Currently
       // downstream code will throw an exception.
       Option(
-        shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
+        new ChunkedByteBuffer(
+          shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer()))
     } else {
       blockInfoManager.lockForReading(blockId).map { info => doGetLocalBytes(blockId, info) }
     }
@@ -465,7 +465,7 @@ private[spark] class BlockManager(
    * Must be called while holding a read lock on the block.
    * Releases the read lock upon exception; keeps the read lock upon successful return.
    */
-  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
+  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ChunkedByteBuffer = {
     val level = info.level
     logDebug(s"Level for block $blockId is $level")
     // In order, try to read the serialized bytes from memory, then from disk, then fall back to
@@ -504,7 +504,7 @@ private[spark] class BlockManager(
    */
   def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
     getRemoteBytes(blockId).map { data =>
-      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())
+      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
     }
   }
 
@@ -521,7 +521,7 @@ private[spark] class BlockManager(
   /**
    * Get block from remote block managers as serialized bytes.
    */
-  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+  def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
     logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     var runningFailureCount = 0
@@ -567,7 +567,7 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        return Some(data)
+        return Some(new ChunkedByteBuffer(data))
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -705,7 +705,7 @@ private[spark] class BlockManager(
    */
   def putBytes(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      bytes: ChunkedByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
@@ -725,7 +725,7 @@ private[spark] class BlockManager(
    */
   private def doPutBytes(
       blockId: BlockId,
-      bytes: ByteBuffer,
+      bytes: ChunkedByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
       keepReadLock: Boolean = false): Boolean = {
@@ -734,25 +734,22 @@ private[spark] class BlockManager(
       // Since we're storing bytes, initiate the replication before storing them locally.
       // This is faster as data is already serialized and ready to send.
       val replicationFuture = if (level.replication > 1) {
-        // Duplicate doesn't copy the bytes, but just creates a wrapper
-        val bufferView = bytes.duplicate()
         Future {
           // This is a blocking action and should run in futureExecutionContext which is a cached
           // thread pool
-          replicate(blockId, bufferView, level)
+          replicate(blockId, bytes, level)
         }(futureExecutionContext)
       } else {
         null
       }
 
-      bytes.rewind()
-      val size = bytes.limit()
+      val size = bytes.size
 
       if (level.useMemory) {
         // Put it in memory first, even if it also has useDisk set to true;
         // We will drop it to disk later if the memory store can't hold it.
         val putSucceeded = if (level.deserialized) {
-          val values = dataDeserialize(blockId, bytes.duplicate())
+          val values = dataDeserialize(blockId, bytes)
           memoryStore.putIterator(blockId, values, level) match {
             case Right(_) => true
             case Left(iter) =>
@@ -922,7 +919,7 @@ private[spark] class BlockManager(
           try {
             replicate(blockId, bytesToReplicate, level)
           } finally {
-            BlockManager.dispose(bytesToReplicate)
+            bytesToReplicate.dispose()
           }
           logDebug("Put block %s remotely took %s"
             .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
@@ -944,29 +941,27 @@ private[spark] class BlockManager(
       blockInfo: BlockInfo,
       blockId: BlockId,
       level: StorageLevel,
-      diskBytes: ByteBuffer): ByteBuffer = {
+      diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
     require(!level.deserialized)
     if (level.useMemory) {
       // Synchronize on blockInfo to guard against a race condition where two readers both try to
       // put values read from disk into the MemoryStore.
       blockInfo.synchronized {
         if (memoryStore.contains(blockId)) {
-          BlockManager.dispose(diskBytes)
+          diskBytes.dispose()
           memoryStore.getBytes(blockId).get
         } else {
-          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.limit(), () => {
+          val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will happen. So if we
             // cannot put it into MemoryStore, copyForMemory should not be created. That's why
-            // this action is put into a `() => ByteBuffer` and created lazily.
-            val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
-            copyForMemory.put(diskBytes)
+            // this action is put into a `() => ChunkedByteBuffer` and created lazily.
+            diskBytes.copy()
           })
           if (putSucceeded) {
-            BlockManager.dispose(diskBytes)
+            diskBytes.dispose()
             memoryStore.getBytes(blockId).get
           } else {
-            diskBytes.rewind()
             diskBytes
           }
         }
@@ -1032,7 +1027,7 @@ private[spark] class BlockManager(
    * Replicate block to another node. Not that this is a blocking call that returns after
    * the block has been replicated.
    */
-  private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+  private def replicate(blockId: BlockId, data: ChunkedByteBuffer, level: StorageLevel): Unit = {
     val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
     val numPeersToReplicateTo = level.replication - 1
     val peersForReplication = new ArrayBuffer[BlockManagerId]
@@ -1085,11 +1080,15 @@ private[spark] class BlockManager(
         case Some(peer) =>
           try {
             val onePeerStartTime = System.currentTimeMillis
-            data.rewind()
-            logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
             blockTransferService.uploadBlockSync(
-              peer.host, peer.port, peer.executorId, blockId, new NioManagedBuffer(data), tLevel)
-            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %s ms"
+              peer.host,
+              peer.port,
+              peer.executorId,
+              blockId,
+              new NettyManagedBuffer(data.toNetty),
+              tLevel)
+            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
               .format(System.currentTimeMillis - onePeerStartTime))
             peersReplicatedTo += peer
             peersForReplication -= peer
@@ -1112,7 +1111,7 @@ private[spark] class BlockManager(
       }
     }
     val timeTakeMs = (System.currentTimeMillis - startTime)
-    logDebug(s"Replicating $blockId of ${data.limit()} bytes to " +
+    logDebug(s"Replicating $blockId of ${data.size} bytes to " +
       s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
     if (peersReplicatedTo.size < numPeersToReplicateTo) {
       logWarning(s"Block $blockId replicated to only " +
@@ -1154,7 +1153,7 @@ private[spark] class BlockManager(
    */
   def dropFromMemory(
       blockId: BlockId,
-      data: () => Either[Array[Any], ByteBuffer]): StorageLevel = {
+      data: () => Either[Array[Any], ChunkedByteBuffer]): StorageLevel = {
     logInfo(s"Dropping block $blockId from memory")
     val info = blockInfoManager.assertBlockIsLockedForWriting(blockId)
     var blockIsUpdated = false
@@ -1281,11 +1280,11 @@ private[spark] class BlockManager(
     ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
   }
 
-  /** Serializes into a byte buffer. */
-  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ByteBuffer = {
-    val byteStream = new ByteBufferOutputStream(4096)
-    dataSerializeStream(blockId, byteStream, values)
-    byteStream.toByteBuffer
+  /** Serializes into a chunked byte buffer. */
+  def dataSerialize(blockId: BlockId, values: Iterator[Any]): ChunkedByteBuffer = {
+    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
+    dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
+    new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
   }
 
   /**
@@ -1298,6 +1297,14 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
+   * the iterator is reached.
+   */
+  def dataDeserialize(blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[Any] = {
+    dataDeserializeStream(blockId, bytes.toInputStream(dispose = true))
+  }
+
+  /**
    * Deserializes a InputStream into an iterator of values and disposes of it when the end of
    * the iterator is reached.
    */
@@ -1325,24 +1332,9 @@ private[spark] class BlockManager(
 }
 
 
-private[spark] object BlockManager extends Logging {
+private[spark] object BlockManager {
   private val ID_GENERATOR = new IdGenerator
 
-  /**
-   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
-   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
-   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
-   * unfortunately no standard API to do this.
-   */
-  def dispose(buffer: ByteBuffer): Unit = {
-    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
-      logTrace(s"Unmapping $buffer")
-      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
-        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
-      }
-    }
-  }
-
   def blockIdsToHosts(
       blockIds: Array[BlockId],
       env: SparkEnv,

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
index 5886b9c..12594e6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.storage
 
-import java.nio.ByteBuffer
-
-import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
- * This [[ManagedBuffer]] wraps a [[ByteBuffer]] which was retrieved from the [[BlockManager]]
+ * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
  * so that the corresponding block's read lock can be released once this buffer's references
  * are released.
  *
@@ -32,7 +31,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 private[storage] class BlockManagerManagedBuffer(
     blockManager: BlockManager,
     blockId: BlockId,
-    buf: ByteBuffer) extends NioManagedBuffer(buf) {
+    chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {
 
   override def retain(): ManagedBuffer = {
     super.retain()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 5c28357..ca23e23 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -26,6 +26,7 @@ import com.google.common.io.Closeables
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
@@ -71,23 +72,18 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       finishTime - startTime))
   }
 
-  def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val bytes = _bytes.duplicate()
+  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
     put(blockId) { fileOutputStream =>
       val channel = fileOutputStream.getChannel
       Utils.tryWithSafeFinally {
-        while (bytes.remaining > 0) {
-          channel.write(bytes)
-        }
+        bytes.writeFully(channel)
       } {
         channel.close()
       }
     }
   }
 
-  def getBytes(blockId: BlockId): ByteBuffer = {
+  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
     val file = diskManager.getFile(blockId.name)
     val channel = new RandomAccessFile(file, "r").getChannel
     Utils.tryWithSafeFinally {
@@ -102,9 +98,9 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
           }
         }
         buf.flip()
-        buf
+        new ChunkedByteBuffer(buf)
       } else {
-        channel.map(MapMode.READ_ONLY, 0, file.length)
+        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
       }
     } {
       channel.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 43cd159..199a5fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.storage
 
+import java.nio.{ByteBuffer, MappedByteBuffer}
+
 import scala.collection.Map
 import scala.collection.mutable
 
+import sun.nio.ch.DirectBuffer
+
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.Logging
 
 /**
  * :: DeveloperApi ::
@@ -222,7 +227,22 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
 }
 
 /** Helper methods for storage-related objects. */
-private[spark] object StorageUtils {
+private[spark] object StorageUtils extends Logging {
+
+  /**
+   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
+   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
+   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
+   * unfortunately no standard API to do this.
+   */
+  def dispose(buffer: ByteBuffer): Unit = {
+    if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
+      logTrace(s"Unmapping $buffer")
+      if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
+        buffer.asInstanceOf[DirectBuffer].cleaner().clean()
+      }
+    }
+  }
 
   /**
    * Update the given list of RDDInfo with the given list of storage statuses.

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index a7c1854..9417132 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.storage.memory
 
-import java.nio.ByteBuffer
 import java.util.LinkedHashMap
 
 import scala.collection.mutable
@@ -29,8 +28,13 @@ import org.apache.spark.memory.MemoryManager
 import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel}
 import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
+import org.apache.spark.util.io.ChunkedByteBuffer
 
-private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
+private sealed trait MemoryEntry {
+  val size: Long
+}
+private case class DeserializedMemoryEntry(value: Array[Any], size: Long) extends MemoryEntry
+private case class SerializedMemoryEntry(buffer: ChunkedByteBuffer, size: Long) extends MemoryEntry
 
 /**
  * Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -91,14 +95,13 @@ private[spark] class MemoryStore(
    *
    * @return true if the put() succeeded, false otherwise.
    */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Boolean = {
+  def putBytes(blockId: BlockId, size: Long, _bytes: () => ChunkedByteBuffer): Boolean = {
     require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     if (memoryManager.acquireStorageMemory(blockId, size)) {
       // We acquired enough memory for the block, so go ahead and put it
-      // Work on a duplicate - since the original input might be used elsewhere.
-      val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-      assert(bytes.limit == size)
-      val entry = new MemoryEntry(bytes, size, deserialized = false)
+      val bytes = _bytes()
+      assert(bytes.size == size)
+      val entry = new SerializedMemoryEntry(bytes, size)
       entries.synchronized {
         entries.put(blockId, entry)
       }
@@ -184,10 +187,10 @@ private[spark] class MemoryStore(
       val arrayValues = vector.toArray
       vector = null
       val entry = if (level.deserialized) {
-        new MemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues), deserialized = true)
+        new DeserializedMemoryEntry(arrayValues, SizeEstimator.estimate(arrayValues))
       } else {
         val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
-        new MemoryEntry(bytes, bytes.limit, deserialized = false)
+        new SerializedMemoryEntry(bytes, bytes.size)
       }
       val size = entry.size
       def transferUnrollToStorage(amount: Long): Unit = {
@@ -241,27 +244,23 @@ private[spark] class MemoryStore(
     }
   }
 
-  def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else {
-      require(!entry.deserialized, "should only call getBytes on blocks stored in serialized form")
-      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
+  def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
+    val entry = entries.synchronized { entries.get(blockId) }
+    entry match {
+      case null => None
+      case e: DeserializedMemoryEntry =>
+        throw new IllegalArgumentException("should only call getBytes on serialized blocks")
+      case SerializedMemoryEntry(bytes, _) => Some(bytes)
     }
   }
 
   def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else {
-      require(entry.deserialized, "should only call getValues on deserialized blocks")
-      Some(entry.value.asInstanceOf[Array[Any]].iterator)
+    val entry = entries.synchronized { entries.get(blockId) }
+    entry match {
+      case null => None
+      case e: SerializedMemoryEntry =>
+        throw new IllegalArgumentException("should only call getValues on deserialized blocks")
+      case DeserializedMemoryEntry(values, _) => Some(values.iterator)
     }
   }
 
@@ -342,10 +341,9 @@ private[spark] class MemoryStore(
           // blocks and removing entries. However the check is still here for
           // future safety.
           if (entry != null) {
-            val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[Array[Any]])
-            } else {
-              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
+            val data = entry match {
+              case DeserializedMemoryEntry(values, _) => Left(values)
+              case SerializedMemoryEntry(buffer, _) => Right(buffer)
             }
             val newEffectiveStorageLevel = blockManager.dropFromMemory(blockId, () => data)
             if (newEffectiveStorageLevel.isValid) {

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
index 54de4d4..dce2ac6 100644
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferInputStream.scala
@@ -20,10 +20,10 @@ package org.apache.spark.util
 import java.io.InputStream
 import java.nio.ByteBuffer
 
-import org.apache.spark.storage.BlockManager
+import org.apache.spark.storage.StorageUtils
 
 /**
- * Reads data from a ByteBuffer, and optionally cleans it up using BlockManager.dispose()
+ * Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
  * at the end of the stream (e.g. to close a memory-mapped file).
  */
 private[spark]
@@ -68,12 +68,12 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
   }
 
   /**
-   * Clean up the buffer, and potentially dispose of it using BlockManager.dispose().
+   * Clean up the buffer, and potentially dispose of it using StorageUtils.dispose().
    */
   private def cleanUp() {
     if (buffer != null) {
       if (dispose) {
-        BlockManager.dispose(buffer)
+        StorageUtils.dispose(buffer)
       }
       buffer = null
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
new file mode 100644
index 0000000..c643c4b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.io
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+
+import com.google.common.primitives.UnsignedBytes
+import io.netty.buffer.{ByteBuf, Unpooled}
+
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.storage.StorageUtils
+
+/**
+ * Read-only byte buffer which is physically stored as multiple chunks rather than a single
+ * contiguous array.
+ *
+ * @param chunks an array of [[ByteBuffer]]s. Each buffer in this array must be non-empty and have
+ *               position == 0. Ownership of these buffers is transferred to the ChunkedByteBuffer,
+ *               so if these buffers may also be used elsewhere then the caller is responsible for
+ *               copying them as needed.
+ */
+private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
+  require(chunks != null, "chunks must not be null")
+  require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
+  require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
+
+  /**
+   * This size of this buffer, in bytes.
+   */
+  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
+
+  def this(byteBuffer: ByteBuffer) = {
+    this(Array(byteBuffer))
+  }
+
+  /**
+   * Write this buffer to a channel.
+   */
+  def writeFully(channel: WritableByteChannel): Unit = {
+    for (bytes <- getChunks()) {
+      while (bytes.remaining > 0) {
+        channel.write(bytes)
+      }
+    }
+  }
+
+  /**
+   * Wrap this buffer to view it as a Netty ByteBuf.
+   */
+  def toNetty: ByteBuf = {
+    Unpooled.wrappedBuffer(getChunks(): _*)
+  }
+
+  /**
+   * Copy this buffer into a new byte array.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size.
+   */
+  def toArray: Array[Byte] = {
+    if (size >= Integer.MAX_VALUE) {
+      throw new UnsupportedOperationException(
+        s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size")
+    }
+    val byteChannel = new ByteArrayWritableChannel(size.toInt)
+    writeFully(byteChannel)
+    byteChannel.close()
+    byteChannel.getData
+  }
+
+  /**
+   * Copy this buffer into a new ByteBuffer.
+   *
+   * @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
+   */
+  def toByteBuffer: ByteBuffer = {
+    if (chunks.length == 1) {
+      chunks.head.duplicate()
+    } else {
+      ByteBuffer.wrap(toArray)
+    }
+  }
+
+  /**
+   * Creates an input stream to read data from this ChunkedByteBuffer.
+   *
+   * @param dispose if true, [[dispose()]] will be called at the end of the stream
+   *                in order to close any memory-mapped files which back this buffer.
+   */
+  def toInputStream(dispose: Boolean = false): InputStream = {
+    new ChunkedByteBufferInputStream(this, dispose)
+  }
+
+  /**
+   * Get duplicates of the ByteBuffers backing this ChunkedByteBuffer.
+   */
+  def getChunks(): Array[ByteBuffer] = {
+    chunks.map(_.duplicate())
+  }
+
+  /**
+   * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers.
+   * The new buffer will share no resources with the original buffer.
+   */
+  def copy(): ChunkedByteBuffer = {
+    val copiedChunks = getChunks().map { chunk =>
+      // TODO: accept an allocator in this copy method to integrate with mem. accounting systems
+      val newChunk = ByteBuffer.allocate(chunk.limit())
+      newChunk.put(chunk)
+      newChunk.flip()
+      newChunk
+    }
+    new ChunkedByteBuffer(copiedChunks)
+  }
+
+  /**
+   * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
+   * might cause errors if one attempts to read from the unmapped buffer, but it's better than
+   * waiting for the GC to find it because that could lead to huge numbers of open files. There's
+   * unfortunately no standard API to do this.
+   */
+  def dispose(): Unit = {
+    chunks.foreach(StorageUtils.dispose)
+  }
+}
+
+/**
+ * Reads data from a ChunkedByteBuffer.
+ *
+ * @param dispose if true, [[ChunkedByteBuffer.dispose()]] will be called at the end of the stream
+ *                in order to close any memory-mapped files which back the buffer.
+ */
+private class ChunkedByteBufferInputStream(
+    var chunkedByteBuffer: ChunkedByteBuffer,
+    dispose: Boolean)
+  extends InputStream {
+
+  private[this] var chunks = chunkedByteBuffer.getChunks().iterator
+  private[this] var currentChunk: ByteBuffer = {
+    if (chunks.hasNext) {
+      chunks.next()
+    } else {
+      null
+    }
+  }
+
+  override def read(): Int = {
+    if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) {
+      currentChunk = chunks.next()
+    }
+    if (currentChunk != null && currentChunk.hasRemaining) {
+      UnsignedBytes.toInt(currentChunk.get())
+    } else {
+      close()
+      -1
+    }
+  }
+
+  override def read(dest: Array[Byte], offset: Int, length: Int): Int = {
+    if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) {
+      currentChunk = chunks.next()
+    }
+    if (currentChunk != null && currentChunk.hasRemaining) {
+      val amountToGet = math.min(currentChunk.remaining(), length)
+      currentChunk.get(dest, offset, amountToGet)
+      amountToGet
+    } else {
+      close()
+      -1
+    }
+  }
+
+  override def skip(bytes: Long): Long = {
+    if (currentChunk != null) {
+      val amountToSkip = math.min(bytes, currentChunk.remaining).toInt
+      currentChunk.position(currentChunk.position + amountToSkip)
+      if (currentChunk.remaining() == 0) {
+        if (chunks.hasNext) {
+          currentChunk = chunks.next()
+        } else {
+          close()
+        }
+      }
+      amountToSkip
+    } else {
+      0L
+    }
+  }
+
+  override def close(): Unit = {
+    if (chunkedByteBuffer != null && dispose) {
+      chunkedByteBuffer.dispose()
+    }
+    chunkedByteBuffer = null
+    chunks = null
+    currentChunk = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
new file mode 100644
index 0000000..aab70e7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.io
+
+import java.nio.ByteBuffer
+
+import com.google.common.io.ByteStreams
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class ChunkedByteBufferSuite extends SparkFunSuite {
+
+  test("no chunks") {
+    val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
+    assert(emptyChunkedByteBuffer.size === 0)
+    assert(emptyChunkedByteBuffer.getChunks().isEmpty)
+    assert(emptyChunkedByteBuffer.toArray === Array.empty)
+    assert(emptyChunkedByteBuffer.toByteBuffer.capacity() === 0)
+    assert(emptyChunkedByteBuffer.toNetty.capacity() === 0)
+    emptyChunkedByteBuffer.toInputStream(dispose = false).close()
+    emptyChunkedByteBuffer.toInputStream(dispose = true).close()
+  }
+
+  test("chunks must be non-empty") {
+    intercept[IllegalArgumentException] {
+      new ChunkedByteBuffer(Array(ByteBuffer.allocate(0)))
+    }
+  }
+
+  test("getChunks() duplicates chunks") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.getChunks().head.position(4)
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("copy() does not affect original buffer's position") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.copy()
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("writeFully() does not affect original buffer's position") {
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
+    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+
+  test("toArray()") {
+    val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes, bytes))
+    assert(chunkedByteBuffer.toArray === bytes.array() ++ bytes.array())
+  }
+
+  test("toArray() throws UnsupportedOperationException if size exceeds 2GB") {
+    val fourMegabyteBuffer = ByteBuffer.allocate(1024 * 1024 * 4)
+    fourMegabyteBuffer.limit(fourMegabyteBuffer.capacity())
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array.fill(1024)(fourMegabyteBuffer))
+    assert(chunkedByteBuffer.size === (1024L * 1024L * 1024L * 4L))
+    intercept[UnsupportedOperationException] {
+      chunkedByteBuffer.toArray
+    }
+  }
+
+  test("toInputStream()") {
+    val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
+    val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(bytes1, bytes2))
+    assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
+
+    val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
+    val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
+    ByteStreams.readFully(inputStream, bytesFromStream)
+    assert(bytesFromStream === bytes1.array() ++ bytes2.array())
+    assert(chunkedByteBuffer.getChunks().head.position() === 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2e0c059..edf5cd3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.shuffle.hash.HashShuffleManager
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
 import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach
   with PrivateMethodTester with ResetSystemProperties {
@@ -192,8 +193,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations("a3").size === 0, "master was told about a3")
 
     // Drop a1 and a2 from memory; this should be reported back to the master
-    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
-    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+    store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+    store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
     assert(store.getSingleAndReleaseLock("a1") === None, "a1 not removed from store")
     assert(store.getSingleAndReleaseLock("a2") === None, "a2 not removed from store")
     assert(master.getLocations("a1").size === 0, "master did not remove a1")
@@ -434,8 +435,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       t2.join()
       t3.join()
 
-      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ByteBuffer])
-      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ByteBuffer])
+      store.dropFromMemoryIfExists("a1", () => null: Either[Array[Any], ChunkedByteBuffer])
+      store.dropFromMemoryIfExists("a2", () => null: Either[Array[Any], ChunkedByteBuffer])
       store.waitForAsyncReregister()
     }
   }
@@ -1253,9 +1254,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     val blockId = BlockId("rdd_3_10")
-    var bytes: ByteBuffer = null
+    var bytes: ChunkedByteBuffer = null
     memoryStore.putBytes(blockId, 10000, () => {
-      bytes = ByteBuffer.allocate(10000)
+      bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
       bytes
     })
     assert(memoryStore.getSize(blockId) === 10000)
@@ -1364,7 +1365,7 @@ private object BlockManagerSuite {
 
     def dropFromMemoryIfExists(
         blockId: BlockId,
-        data: () => Either[Array[Any], ByteBuffer]): Unit = {
+        data: () => Either[Array[Any], ChunkedByteBuffer]): Unit = {
       store.blockInfoManager.lockForWriting(blockId).foreach { info =>
         val newEffectiveStorageLevel = store.dropFromMemory(blockId, data)
         if (newEffectiveStorageLevel.isValid) {
@@ -1394,7 +1395,9 @@ private object BlockManagerSuite {
     val getLocalAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.getLocalValues)
     val getAndReleaseLock: (BlockId) => Option[BlockResult] = wrapGet(store.get)
     val getSingleAndReleaseLock: (BlockId) => Option[Any] = wrapGet(store.getSingle)
-    val getLocalBytesAndReleaseLock: (BlockId) => Option[ByteBuffer] = wrapGet(store.getLocalBytes)
+    val getLocalBytesAndReleaseLock: (BlockId) => Option[ChunkedByteBuffer] = {
+      wrapGet(store.getLocalBytes)
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 97e74fe..9ed5016 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -21,6 +21,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.util.Arrays
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 class DiskStoreSuite extends SparkFunSuite {
 
@@ -29,7 +30,7 @@ class DiskStoreSuite extends SparkFunSuite {
 
     // Create a non-trivial (not all zeros) byte array
     val bytes = Array.tabulate[Byte](1000)(_.toByte)
-    val byteBuffer = ByteBuffer.wrap(bytes)
+    val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes))
 
     val blockId = BlockId("rdd_1_2")
     val diskBlockManager = new DiskBlockManager(new SparkConf(), deleteFilesOnStop = true)
@@ -44,9 +45,10 @@ class DiskStoreSuite extends SparkFunSuite {
     val notMapped = diskStoreNotMapped.getBytes(blockId)
 
     // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
-    assert(notMapped.getClass.getName.endsWith("HeapByteBuffer"),
+    assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
       "Expected HeapByteBuffer for un-mapped read")
-    assert(mapped.isInstanceOf[MappedByteBuffer], "Expected MappedByteBuffer for mapped read")
+    assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
+      "Expected MappedByteBuffer for mapped read")
 
     def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
       val array = new Array[Byte](in.remaining())
@@ -54,9 +56,7 @@ class DiskStoreSuite extends SparkFunSuite {
       array
     }
 
-    val mappedAsArray = arrayFromByteBuffer(mapped)
-    val notMappedAsArray = arrayFromByteBuffer(notMapped)
-    assert(Arrays.equals(mappedAsArray, bytes))
-    assert(Arrays.equals(notMappedAsArray, bytes))
+    assert(Arrays.equals(mapped.toArray, bytes))
+    assert(Arrays.equals(notMapped.toArray, bytes))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index f811784..8625882 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -28,11 +28,13 @@ import org.apache.spark.rdd.BlockRDD
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.streaming.util._
 import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
  * It contains information about the id of the blocks having this partition's data and
  * the corresponding record handle in the write ahead log that backs the partition.
+ *
  * @param index index of the partition
  * @param blockId id of the block having the partition data
  * @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
@@ -59,7 +61,6 @@ class WriteAheadLogBackedBlockRDDPartition(
  * correctness, and it can be used in situations where it is known that the block
  * does not exist in the Spark executors (e.g. after a failed driver is restarted).
  *
- *
  * @param sc SparkContext
  * @param _blockIds Ids of the blocks that contains this RDD's data
  * @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
@@ -156,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
       logInfo(s"Read partition data of $this from write ahead log, record handle " +
         partition.walRecordHandle)
       if (storeInBlockManager) {
-        blockManager.putBytes(blockId, dataRead, storageLevel)
+        blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
         logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
         dataRead.rewind()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 4880884..6d4f4b9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -30,6 +30,7 @@ import org.apache.spark.storage._
 import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
 import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.io.ChunkedByteBuffer
 
 /** Trait that represents the metadata related to storage of blocks */
 private[streaming] trait ReceivedBlockStoreResult {
@@ -84,7 +85,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
         numRecords = countIterator.count
         putResult
       case ByteBufferBlock(byteBuffer) =>
-        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
+        blockManager.putBytes(
+          blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)
       case o =>
         throw new SparkException(
           s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
@@ -178,15 +180,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
         numRecords = countIterator.count
         serializedBlock
       case ByteBufferBlock(byteBuffer) =>
-        byteBuffer
+        new ChunkedByteBuffer(byteBuffer.duplicate())
       case _ =>
         throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
     }
 
     // Store the block in block manager
     val storeInBlockManagerFuture = Future {
-      val putSucceeded =
-        blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
+      val putSucceeded = blockManager.putBytes(
+        blockId,
+        serializedBlock,
+        effectiveStorageLevel,
+        tellMaster = true)
       if (!putSucceeded) {
         throw new SparkException(
           s"Could not store $blockId to block manager with storage level $storageLevel")
@@ -195,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
 
     // Store the block in write ahead log
     val storeInWriteAheadLogFuture = Future {
-      writeAheadLog.write(serializedBlock, clock.getTimeMillis())
+      writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis())
     }
 
     // Combine the futures, wait for both to complete, and return the write ahead log record handle

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 2d509af..76f67ed 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -339,7 +339,7 @@ class ReceivedBlockHandlerSuite
 
     storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
     storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
-    storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) })
+    storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
   }
 
   /** Test error handling when blocks that cannot be stored */

http://git-wip-us.apache.org/repos/asf/spark/blob/6c2d894a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 79ac833..c4bf42d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -223,7 +223,7 @@ class WriteAheadLogBackedBlockRDDSuite
     require(blockData.size === blockIds.size)
     val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
     val segments = blockData.zip(blockIds).map { case (data, id) =>
-      writer.write(blockManager.dataSerialize(id, data.iterator))
+      writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer)
     }
     writer.close()
     segments


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org