You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/03/08 19:40:40 UTC

spark git commit: [SPARK-13695] Don't cache MEMORY_AND_DISK blocks as bytes in memory after spills

Repository: spark
Updated Branches:
  refs/heads/master 78d3b6051 -> ad3c9a973


[SPARK-13695] Don't cache MEMORY_AND_DISK blocks as bytes in memory after spills

When a cached block is spilled to disk and read back in serialized form (i.e. as bytes), the current BlockManager implementation will attempt to re-insert the serialized block into the MemoryStore even if the block's storage level requests deserialized caching.

This behavior adds some complexity to the MemoryStore but I don't think it offers many performance benefits and I'd like to remove it in order to simplify a larger refactoring patch. Therefore, this patch changes the behavior so that disk store reads will only cache bytes in the memory store for blocks with serialized storage levels.

There are two places where we request serialized bytes from the BlockStore:

1. getLocalBytes(), which is only called when reading local copies of TorrentBroadcast pieces. Broadcast pieces are always cached using a serialized storage level, so this won't lead to a mismatch in serialization forms if spilled bytes read from disk are cached as bytes in the memory store.
2. the non-shuffle-block branch in getBlockData(), which is only called by the NettyBlockRpcServer when responding to requests to read remote blocks. Caching the serialized bytes in memory will only benefit us if those cached bytes are read before they're evicted and the likelihood of that happening seems low since the frequency of remote reads of non-broadcast cached blocks seems very low. Caching these bytes when they have a low probability of being read is bad if it risks the eviction of blocks which are cached in their expected serialized/deserialized forms, since those blocks seem more likely to be read in local computation.

Given the argument above, I think this change is unlikely to cause performance regressions.

Author: Josh Rosen <jo...@databricks.com>

Closes #11533 from JoshRosen/remove-memorystore-level-mismatch.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3c9a97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3c9a97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3c9a97

Branch: refs/heads/master
Commit: ad3c9a9730535faa6d270ee83412f79ec3db8333
Parents: 78d3b60
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Mar 8 10:40:27 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Mar 8 10:40:27 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala |  5 ++--
 .../org/apache/spark/storage/MemoryStore.scala  |  4 +++
 .../spark/storage/BlockManagerSuite.scala       | 30 +++++++++++++-------
 3 files changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad3c9a97/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dcf359e..b38e2ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -495,10 +495,9 @@ private[spark] class BlockManager(
         }
       } else {
         // Otherwise, we also have to store something in the memory store
-        if (!level.deserialized || !asBlockResult) {
+        if (!level.deserialized && !asBlockResult) {
           /* We'll store the bytes in memory if the block's storage level includes
-           * "memory serialized", or if it should be cached as objects in memory
-           * but we only requested its serialized bytes. */
+           * "memory serialized" and we requested its serialized bytes. */
           memoryStore.putBytes(blockId, bytes.limit, () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will happen. So if we cannot

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3c9a97/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 12b70d1..bb72fe4 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -88,6 +88,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
   }
 
   override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): Unit = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     // Work on a duplicate - since the original input might be used elsewhere.
     val bytes = _bytes.duplicate()
     bytes.rewind()
@@ -106,6 +107,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
    * The caller should guarantee that `size` is correct.
    */
   def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     // Work on a duplicate - since the original input might be used elsewhere.
     lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
     val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
@@ -118,6 +120,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel): Either[Iterator[Any], Long] = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     putIterator(blockId, values, level, allowPersistToDisk = true)
   }
 
@@ -138,6 +141,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
       values: Iterator[Any],
       level: StorageLevel,
       allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
+    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
     val unrolledValues = unrollSafely(blockId, values)
     unrolledValues match {
       case Left(arrayValues) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3c9a97/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cfcbf17..0485b05 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -585,36 +585,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
   }
 
   test("disk and memory storage") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getSingleAndReleaseLock)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = false)
   }
 
   test("disk and memory storage with getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, _.getLocalBytesAndReleaseLock)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK, getAsBytes = true)
   }
 
   test("disk and memory storage with serialization") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getSingleAndReleaseLock)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = false)
   }
 
   test("disk and memory storage with serialization and getLocalBytes") {
-    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, _.getLocalBytesAndReleaseLock)
+    testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
   }
 
   def testDiskAndMemoryStorage(
       storageLevel: StorageLevel,
-      accessMethod: BlockManager => BlockId => Option[_]): Unit = {
+      getAsBytes: Boolean): Unit = {
     store = makeBlockManager(12000)
+    val accessMethod =
+      if (getAsBytes) store.getLocalBytesAndReleaseLock else store.getSingleAndReleaseLock
     val a1 = new Array[Byte](4000)
     val a2 = new Array[Byte](4000)
     val a3 = new Array[Byte](4000)
     store.putSingle("a1", a1, storageLevel)
     store.putSingle("a2", a2, storageLevel)
     store.putSingle("a3", a3, storageLevel)
-    assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
-    assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
-    assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
-    assert(accessMethod(store)("a1").isDefined, "a1 was not in store")
-    assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
+    assert(accessMethod("a2").isDefined, "a2 was not in store")
+    assert(accessMethod("a3").isDefined, "a3 was not in store")
+    assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+    assert(accessMethod("a1").isDefined, "a1 was not in store")
+    val dataShouldHaveBeenCachedBackIntoMemory = {
+      if (storageLevel.deserialized) !getAsBytes
+      else getAsBytes
+    }
+    if (dataShouldHaveBeenCachedBackIntoMemory) {
+      assert(store.memoryStore.contains("a1"), "a1 was not in memory store")
+    } else {
+      assert(!store.memoryStore.contains("a1"), "a1 was in memory store")
+    }
   }
 
   test("LRU with mixed storage levels") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org