You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/09/25 19:02:36 UTC

spark git commit: [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

Repository: spark
Updated Branches:
  refs/heads/master 365a29bdb -> 2c5b9b117


[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

## What changes were proposed in this pull request?

MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.

## How was this patch tested?

Added unit test.

Author: Imran Rashid <ir...@cloudera.com>

Closes #19311 from squito/SPARK-22083.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5b9b11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5b9b11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5b9b11

Branch: refs/heads/master
Commit: 2c5b9b1173c23f6ca8890817a9a35dc7557b0776
Parents: 365a29b
Author: Imran Rashid <ir...@cloudera.com>
Authored: Mon Sep 25 12:02:30 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Sep 25 12:02:30 2017 -0700

----------------------------------------------------------------------
 .../spark/storage/memory/MemoryStore.scala      |  47 ++++++--
 .../apache/spark/storage/MemoryStoreSuite.scala | 119 +++++++++++++++++++
 2 files changed, 153 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b9b11/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index eb2201d..651e9c7 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
-import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
+import org.apache.spark.storage._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
@@ -544,20 +544,38 @@ private[spark] class MemoryStore(
       }
 
       if (freedMemory >= space) {
-        logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
-          s"(${Utils.bytesToString(freedMemory)} bytes)")
-        for (blockId <- selectedBlocks) {
-          val entry = entries.synchronized { entries.get(blockId) }
-          // This should never be null as only one task should be dropping
-          // blocks and removing entries. However the check is still here for
-          // future safety.
-          if (entry != null) {
-            dropBlock(blockId, entry)
+        var lastSuccessfulBlock = -1
+        try {
+          logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
+            s"(${Utils.bytesToString(freedMemory)} bytes)")
+          (0 until selectedBlocks.size).foreach { idx =>
+            val blockId = selectedBlocks(idx)
+            val entry = entries.synchronized {
+              entries.get(blockId)
+            }
+            // This should never be null as only one task should be dropping
+            // blocks and removing entries. However the check is still here for
+            // future safety.
+            if (entry != null) {
+              dropBlock(blockId, entry)
+              afterDropAction(blockId)
+            }
+            lastSuccessfulBlock = idx
+          }
+          logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
+            s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
+          freedMemory
+        } finally {
+          // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal
+          // with InterruptedException
+          if (lastSuccessfulBlock != selectedBlocks.size - 1) {
+            // the blocks we didn't process successfully are still locked, so we have to unlock them
+            (lastSuccessfulBlock + 1 until selectedBlocks.size).foreach { idx =>
+              val blockId = selectedBlocks(idx)
+              blockInfoManager.unlock(blockId)
+            }
           }
         }
-        logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
-          s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}")
-        freedMemory
       } else {
         blockId.foreach { id =>
           logInfo(s"Will not store $id")
@@ -570,6 +588,9 @@ private[spark] class MemoryStore(
     }
   }
 
+  // hook for testing, so we can simulate a race
+  protected def afterDropAction(blockId: BlockId): Unit = {}
+
   def contains(blockId: BlockId): Boolean = {
     entries.synchronized { entries.containsKey(blockId) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5b9b11/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 9929ea0..7274072 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -407,4 +407,123 @@ class MemoryStoreSuite
     })
     assert(memoryStore.getSize(blockId) === 10000)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+    // Setup a memory store with many blocks cached, and then one request which leads to multiple
+    // blocks getting evicted.  We'll make the eviction throw an exception, and make sure that
+    // all locks are released.
+    val ct = implicitly[ClassTag[Array[Byte]]]
+    val numInitialBlocks = 10
+    val memStoreSize = 100
+    val bytesPerSmallBlock = memStoreSize / numInitialBlocks
+    def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): Unit = {
+      val tc = TaskContext.empty()
+      val memManager = new StaticMemoryManager(conf, Long.MaxValue, memStoreSize, numCores = 1)
+      val blockInfoManager = new BlockInfoManager
+      blockInfoManager.registerTask(tc.taskAttemptId)
+      var droppedSoFar = 0
+      val blockEvictionHandler = new BlockEvictionHandler {
+        var memoryStore: MemoryStore = _
+
+        override private[storage] def dropFromMemory[T: ClassTag](
+            blockId: BlockId,
+            data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = {
+          if (droppedSoFar < numValidBlocks) {
+            droppedSoFar += 1
+            memoryStore.remove(blockId)
+            if (readLockAfterDrop) {
+              // for testing purposes, we act like another thread gets the read lock on the new
+              // block
+              StorageLevel.DISK_ONLY
+            } else {
+              StorageLevel.NONE
+            }
+          } else {
+            throw new RuntimeException(s"Mock error dropping block $droppedSoFar")
+          }
+        }
+      }
+      val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager,
+          blockEvictionHandler) {
+        override def afterDropAction(blockId: BlockId): Unit = {
+          if (readLockAfterDrop) {
+            // pretend that we get a read lock on the block (now on disk) in another thread
+            TaskContext.setTaskContext(tc)
+            blockInfoManager.lockForReading(blockId)
+            TaskContext.unset()
+          }
+        }
+      }
+
+      blockEvictionHandler.memoryStore = memoryStore
+      memManager.setMemoryStore(memoryStore)
+
+      // Put in some small blocks to fill up the memory store
+      val initialBlocks = (1 to numInitialBlocks).map { id =>
+        val blockId = BlockId(s"rdd_1_$id")
+        val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
+        val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+        assert(initialWriteLock)
+        val success = memoryStore.putBytes(blockId, bytesPerSmallBlock, MemoryMode.ON_HEAP, () => {
+          new ChunkedByteBuffer(ByteBuffer.allocate(bytesPerSmallBlock))
+        })
+        assert(success)
+        blockInfoManager.unlock(blockId, None)
+      }
+      assert(blockInfoManager.size === numInitialBlocks)
+
+
+      // Add one big block, which will require evicting everything in the memorystore.  However our
+      // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared.
+      val largeBlockId = BlockId(s"rdd_2_1")
+      val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false)
+      val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+      assert(initialWriteLock)
+      if (numValidBlocks < numInitialBlocks) {
+        val exc = intercept[RuntimeException] {
+          memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
+            new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
+          })
+        }
+        assert(exc.getMessage().startsWith("Mock error dropping block"), exc)
+        // BlockManager.doPut takes care of releasing the lock for the newly written block -- not
+        // testing that here, so do it manually
+        blockInfoManager.removeBlock(largeBlockId)
+      } else {
+        memoryStore.putBytes(largeBlockId, memStoreSize, MemoryMode.ON_HEAP, () => {
+          new ChunkedByteBuffer(ByteBuffer.allocate(memStoreSize))
+        })
+        // BlockManager.doPut takes care of releasing the lock for the newly written block -- not
+        // testing that here, so do it manually
+        blockInfoManager.unlock(largeBlockId)
+      }
+
+      val largeBlockInMemory = if (numValidBlocks == numInitialBlocks) 1 else 0
+      val expBlocks = numInitialBlocks +
+        (if (readLockAfterDrop) 0 else -numValidBlocks) +
+        largeBlockInMemory
+      assert(blockInfoManager.size === expBlocks)
+
+      val blocksStillInMemory = blockInfoManager.entries.filter { case (id, info) =>
+        assert(info.writerTask === BlockInfo.NO_WRITER, id)
+        // in this test, all the blocks in memory have no reader, but everything dropped to disk
+        // had another thread read the block.  We shouldn't lose the other thread's reader lock.
+        if (memoryStore.contains(id)) {
+          assert(info.readerCount === 0, id)
+          true
+        } else {
+          assert(info.readerCount === 1, id)
+          false
+        }
+      }
+      assert(blocksStillInMemory.size ===
+        (numInitialBlocks - numValidBlocks + largeBlockInMemory))
+    }
+
+    Seq(0, 3, numInitialBlocks).foreach { failAfterDropping =>
+      Seq(true, false).foreach { readLockAfterDropping =>
+        testFailureOnNthDrop(failAfterDropping, readLockAfterDropping)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org