You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/02/03 04:35:37 UTC

spark git commit: [SPARK-13122] Fix race condition in MemoryStore.unrollSafely()

Repository: spark
Updated Branches:
  refs/heads/master 21112e8a1 -> ff71261b6


[SPARK-13122] Fix race condition in MemoryStore.unrollSafely()

https://issues.apache.org/jira/browse/SPARK-13122

A race condition can occur in MemoryStore's unrollSafely() method if two threads that
return the same value for currentTaskAttemptId() execute this method concurrently. This
change makes the operation of reading the initial amount of unroll memory used, performing
the unroll, and updating the associated memory maps atomic in order to avoid this race
condition.

Initial proposed fix wraps all of unrollSafely() in a memoryManager.synchronized { } block. A cleaner approach might be introduce a mechanism that synchronizes based on task attempt ID. An alternative option might be to track unroll/pending unroll memory based on block ID rather than task attempt ID.

Author: Adam Budde <bu...@amazon.com>

Closes #11012 from budde/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff71261b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff71261b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff71261b

Branch: refs/heads/master
Commit: ff71261b651a7b289ea2312abd6075da8b838ed9
Parents: 21112e8
Author: Adam Budde <bu...@amazon.com>
Authored: Tue Feb 2 19:35:33 2016 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Feb 2 19:35:33 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/storage/MemoryStore.scala  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff71261b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 76aaa78..024b660 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -255,8 +255,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     var memoryThreshold = initialMemoryThreshold
     // Memory to request as a multiple of current vector size
     val memoryGrowthFactor = 1.5
-    // Previous unroll memory held by this task, for releasing later (only at the very end)
-    val previousMemoryReserved = currentUnrollMemoryForThisTask
+    // Keep track of pending unroll memory reserved by this method.
+    var pendingMemoryReserved = 0L
     // Underlying vector for unrolling the block
     var vector = new SizeTrackingVector[Any]
 
@@ -266,6 +266,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
         s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
+    } else {
+      pendingMemoryReserved += initialMemoryThreshold
     }
 
     // Unroll this block safely, checking whether we have exceeded our threshold periodically
@@ -278,6 +280,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
           if (currentSize >= memoryThreshold) {
             val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
             keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+            if (keepUnrolling) {
+              pendingMemoryReserved += amountToRequest
+            }
             // New threshold is currentSize * memoryGrowthFactor
             memoryThreshold += amountToRequest
           }
@@ -304,10 +309,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
           // release the unroll memory yet. Instead, we transfer it to pending unroll memory
           // so `tryToPut` can further transfer it to normal storage memory later.
           // TODO: we can probably express this without pending unroll memory (SPARK-10907)
-          val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
-          unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
+          unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
           pendingUnrollMemoryMap(taskAttemptId) =
-            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
+            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + pendingMemoryReserved
         }
       } else {
         // Otherwise, if we return an iterator, we can only release the unroll memory when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org