You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/29 06:53:48 UTC

spark git commit: [SPARK-9419] ShuffleMemoryManager and MemoryStore should track memory on a per-task, not per-thread, basis

Repository: spark
Updated Branches:
  refs/heads/master 429b2f0df -> ea49705bd


[SPARK-9419] ShuffleMemoryManager and MemoryStore should track memory on a per-task, not per-thread, basis

Spark's ShuffleMemoryManager and MemoryStore track memory on a per-thread basis, which causes problems in the handful of cases where we have tasks that use multiple threads. In PythonRDD, RRDD, ScriptTransformation, and PipedRDD we consume the input iterator in a separate thread in order to write it to an external process.  As a result, these RDD's input iterators are consumed in a different thread than the thread that created them, which can cause problems in our memory allocation tracking. For example, if allocations are performed in one thread but deallocations are performed in a separate thread then memory may be leaked or we may get errors complaining that more memory was allocated than was freed.

I think that the right way to fix this is to change our accounting to be performed on a per-task instead of per-thread basis.  Note that the current per-thread tracking has caused problems in the past; SPARK-3731 (#2668) fixes a memory leak in PythonRDD that was caused by this issue (that fix is no longer necessary as of this patch).

Author: Josh Rosen <jo...@databricks.com>

Closes #7734 from JoshRosen/memory-tracking-fixes and squashes the following commits:

b4b1702 [Josh Rosen] Propagate TaskContext to writer threads.
57c9b4e [Josh Rosen] Merge remote-tracking branch 'origin/master' into memory-tracking-fixes
ed25d3b [Josh Rosen] Address minor PR review comments
44f6497 [Josh Rosen] Fix long line.
7b0f04b [Josh Rosen] Fix ShuffleMemoryManagerSuite
f57f3f2 [Josh Rosen] More thread -> task changes
fa78ee8 [Josh Rosen] Move Executor's cleanup into Task so that TaskContext is defined when cleanup is performed
5e2f01e [Josh Rosen] Fix capitalization
1b0083b [Josh Rosen] Roll back fix in PySpark, which is no longer necessary
2e1e0f8 [Josh Rosen] Use TaskAttemptIds to track shuffle memory
c9e8e54 [Josh Rosen] Use TaskAttemptIds to track unroll memory


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea49705b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea49705b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea49705b

Branch: refs/heads/master
Commit: ea49705bd4feb2f25e1b536f0b3ddcfc72a57101
Parents: 429b2f0
Author: Josh Rosen <jo...@databricks.com>
Authored: Tue Jul 28 21:53:28 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jul 28 21:53:28 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala |  6 +-
 .../scala/org/apache/spark/api/r/RRDD.scala     |  2 +
 .../org/apache/spark/executor/Executor.scala    |  4 -
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  1 +
 .../scala/org/apache/spark/scheduler/Task.scala | 15 +++-
 .../spark/shuffle/ShuffleMemoryManager.scala    | 88 +++++++++---------
 .../org/apache/spark/storage/MemoryStore.scala  | 95 +++++++++++---------
 .../shuffle/ShuffleMemoryManagerSuite.scala     | 41 ++++++---
 .../spark/storage/BlockManagerSuite.scala       | 84 ++++++++---------
 9 files changed, 184 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 598953a..55e563e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -207,6 +207,7 @@ private[spark] class PythonRDD(
 
     override def run(): Unit = Utils.logUncaughtExceptions {
       try {
+        TaskContext.setTaskContext(context)
         val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
         val dataOut = new DataOutputStream(stream)
         // Partition index
@@ -263,11 +264,6 @@ private[spark] class PythonRDD(
           if (!worker.isClosed) {
             Utils.tryLog(worker.shutdownOutput())
           }
-      } finally {
-        // Release memory used by this thread for shuffles
-        env.shuffleMemoryManager.releaseMemoryForThisThread()
-        // Release memory used by this thread for unrolling blocks
-        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 23a470d..1cf2824f 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -112,6 +112,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
     partition: Int): Unit = {
 
     val env = SparkEnv.get
+    val taskContext = TaskContext.get()
     val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
     val stream = new BufferedOutputStream(output, bufferSize)
 
@@ -119,6 +120,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
       override def run(): Unit = {
         try {
           SparkEnv.set(env)
+          TaskContext.setTaskContext(taskContext)
           val dataOut = new DataOutputStream(stream)
           dataOut.writeInt(partition)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e76664f..7bc7fce 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -313,10 +313,6 @@ private[spark] class Executor(
           }
 
       } finally {
-        // Release memory used by this thread for shuffles
-        env.shuffleMemoryManager.releaseMemoryForThisThread()
-        // Release memory used by this thread for unrolling blocks
-        env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
         runningTasks.remove(taskId)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index defdabf..3bb9998 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -133,6 +133,7 @@ private[spark] class PipedRDD[T: ClassTag](
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + command) {
       override def run() {
+        TaskContext.setTaskContext(context)
         val out = new PrintWriter(proc.getOutputStream)
 
         // scalastyle:off println

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d11a009..1978305 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.{TaskContextImpl, TaskContext}
+import org.apache.spark.{SparkEnv, TaskContextImpl, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.unsafe.memory.TaskMemoryManager
@@ -86,7 +86,18 @@ private[spark] abstract class Task[T](
       (runTask(context), context.collectAccumulators())
     } finally {
       context.markTaskCompleted()
-      TaskContext.unset()
+      try {
+        Utils.tryLogNonFatalError {
+          // Release memory used by this thread for shuffles
+          SparkEnv.get.shuffleMemoryManager.releaseMemoryForThisTask()
+        }
+        Utils.tryLogNonFatalError {
+          // Release memory used by this thread for unrolling blocks
+          SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+        }
+      } finally {
+        TaskContext.unset()
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index 3bcc717..f038b72 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -19,95 +19,101 @@ package org.apache.spark.shuffle
 
 import scala.collection.mutable
 
-import org.apache.spark.{Logging, SparkException, SparkConf}
+import org.apache.spark.{Logging, SparkException, SparkConf, TaskContext}
 
 /**
- * Allocates a pool of memory to task threads for use in shuffle operations. Each disk-spilling
+ * Allocates a pool of memory to tasks for use in shuffle operations. Each disk-spilling
  * collection (ExternalAppendOnlyMap or ExternalSorter) used by these tasks can acquire memory
  * from this pool and release it as it spills data out. When a task ends, all its memory will be
  * released by the Executor.
  *
- * This class tries to ensure that each thread gets a reasonable share of memory, instead of some
- * thread ramping up to a large amount first and then causing others to spill to disk repeatedly.
- * If there are N threads, it ensures that each thread can acquire at least 1 / 2N of the memory
+ * This class tries to ensure that each task gets a reasonable share of memory, instead of some
+ * task ramping up to a large amount first and then causing others to spill to disk repeatedly.
+ * If there are N tasks, it ensures that each tasks can acquire at least 1 / 2N of the memory
  * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
- * set of active threads and redo the calculations of 1 / 2N and 1 / N in waiting threads whenever
+ * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
  * this set changes. This is all done by synchronizing access on "this" to mutate state and using
  * wait() and notifyAll() to signal changes.
  */
 private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging {
-  private val threadMemory = new mutable.HashMap[Long, Long]()  // threadId -> memory bytes
+  private val taskMemory = new mutable.HashMap[Long, Long]()  // taskAttemptId -> memory bytes
 
   def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))
 
+  private def currentTaskAttemptId(): Long = {
+    // In case this is called on the driver, return an invalid task attempt id.
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
+  }
+
   /**
-   * Try to acquire up to numBytes memory for the current thread, and return the number of bytes
+   * Try to acquire up to numBytes memory for the current task, and return the number of bytes
    * obtained, or 0 if none can be allocated. This call may block until there is enough free memory
-   * in some situations, to make sure each thread has a chance to ramp up to at least 1 / 2N of the
-   * total memory pool (where N is the # of active threads) before it is forced to spill. This can
-   * happen if the number of threads increases but an older thread had a lot of memory already.
+   * in some situations, to make sure each task has a chance to ramp up to at least 1 / 2N of the
+   * total memory pool (where N is the # of active tasks) before it is forced to spill. This can
+   * happen if the number of tasks increases but an older task had a lot of memory already.
    */
   def tryToAcquire(numBytes: Long): Long = synchronized {
-    val threadId = Thread.currentThread().getId
+    val taskAttemptId = currentTaskAttemptId()
     assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
 
-    // Add this thread to the threadMemory map just so we can keep an accurate count of the number
-    // of active threads, to let other threads ramp down their memory in calls to tryToAcquire
-    if (!threadMemory.contains(threadId)) {
-      threadMemory(threadId) = 0L
-      notifyAll()  // Will later cause waiting threads to wake up and check numThreads again
+    // Add this task to the taskMemory map just so we can keep an accurate count of the number
+    // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
+    if (!taskMemory.contains(taskAttemptId)) {
+      taskMemory(taskAttemptId) = 0L
+      notifyAll()  // Will later cause waiting tasks to wake up and check numThreads again
     }
 
     // Keep looping until we're either sure that we don't want to grant this request (because this
-    // thread would have more than 1 / numActiveThreads of the memory) or we have enough free
-    // memory to give it (we always let each thread get at least 1 / (2 * numActiveThreads)).
+    // task would have more than 1 / numActiveTasks of the memory) or we have enough free
+    // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
     while (true) {
-      val numActiveThreads = threadMemory.keys.size
-      val curMem = threadMemory(threadId)
-      val freeMemory = maxMemory - threadMemory.values.sum
+      val numActiveTasks = taskMemory.keys.size
+      val curMem = taskMemory(taskAttemptId)
+      val freeMemory = maxMemory - taskMemory.values.sum
 
-      // How much we can grant this thread; don't let it grow to more than 1 / numActiveThreads;
+      // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
       // don't let it be negative
-      val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / numActiveThreads) - curMem))
+      val maxToGrant = math.min(numBytes, math.max(0, (maxMemory / numActiveTasks) - curMem))
 
-      if (curMem < maxMemory / (2 * numActiveThreads)) {
-        // We want to let each thread get at least 1 / (2 * numActiveThreads) before blocking;
-        // if we can't give it this much now, wait for other threads to free up memory
-        // (this happens if older threads allocated lots of memory before N grew)
-        if (freeMemory >= math.min(maxToGrant, maxMemory / (2 * numActiveThreads) - curMem)) {
+      if (curMem < maxMemory / (2 * numActiveTasks)) {
+        // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+        // if we can't give it this much now, wait for other tasks to free up memory
+        // (this happens if older tasks allocated lots of memory before N grew)
+        if (freeMemory >= math.min(maxToGrant, maxMemory / (2 * numActiveTasks) - curMem)) {
           val toGrant = math.min(maxToGrant, freeMemory)
-          threadMemory(threadId) += toGrant
+          taskMemory(taskAttemptId) += toGrant
           return toGrant
         } else {
-          logInfo(s"Thread $threadId waiting for at least 1/2N of shuffle memory pool to be free")
+          logInfo(
+            s"Thread $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free")
           wait()
         }
       } else {
         // Only give it as much memory as is free, which might be none if it reached 1 / numThreads
         val toGrant = math.min(maxToGrant, freeMemory)
-        threadMemory(threadId) += toGrant
+        taskMemory(taskAttemptId) += toGrant
         return toGrant
       }
     }
     0L  // Never reached
   }
 
-  /** Release numBytes bytes for the current thread. */
+  /** Release numBytes bytes for the current task. */
   def release(numBytes: Long): Unit = synchronized {
-    val threadId = Thread.currentThread().getId
-    val curMem = threadMemory.getOrElse(threadId, 0L)
+    val taskAttemptId = currentTaskAttemptId()
+    val curMem = taskMemory.getOrElse(taskAttemptId, 0L)
     if (curMem < numBytes) {
       throw new SparkException(
-        s"Internal error: release called on ${numBytes} bytes but thread only has ${curMem}")
+        s"Internal error: release called on ${numBytes} bytes but task only has ${curMem}")
     }
-    threadMemory(threadId) -= numBytes
+    taskMemory(taskAttemptId) -= numBytes
     notifyAll()  // Notify waiters who locked "this" in tryToAcquire that memory has been freed
   }
 
-  /** Release all memory for the current thread and mark it as inactive (e.g. when a task ends). */
-  def releaseMemoryForThisThread(): Unit = synchronized {
-    val threadId = Thread.currentThread().getId
-    threadMemory.remove(threadId)
+  /** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */
+  def releaseMemoryForThisTask(): Unit = synchronized {
+    val taskAttemptId = currentTaskAttemptId()
+    taskMemory.remove(taskAttemptId)
     notifyAll()  // Notify waiters who locked "this" in tryToAcquire that memory has been freed
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index ed60977..6f27f00 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -23,6 +23,7 @@ import java.util.LinkedHashMap
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.TaskContext
 import org.apache.spark.util.{SizeEstimator, Utils}
 import org.apache.spark.util.collection.SizeTrackingVector
 
@@ -43,11 +44,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
   // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
   private val accountingLock = new Object
 
-  // A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
+  // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
   // All accesses of this map are assumed to have manually synchronized on `accountingLock`
   private val unrollMemoryMap = mutable.HashMap[Long, Long]()
   // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
-  // Pending unroll memory refers to the intermediate memory occupied by a thread
+  // Pending unroll memory refers to the intermediate memory occupied by a task
   // after the unroll but before the actual putting of the block in the cache.
   // This chunk of memory is expected to be released *as soon as* we finish
   // caching the corresponding block as opposed to until after the task finishes.
@@ -250,21 +251,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     var elementsUnrolled = 0
     // Whether there is still enough memory for us to continue unrolling this block
     var keepUnrolling = true
-    // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
+    // Initial per-task memory to request for unrolling blocks (bytes). Exposed for testing.
     val initialMemoryThreshold = unrollMemoryThreshold
     // How often to check whether we need to request more memory
     val memoryCheckPeriod = 16
-    // Memory currently reserved by this thread for this particular unrolling operation
+    // Memory currently reserved by this task for this particular unrolling operation
     var memoryThreshold = initialMemoryThreshold
     // Memory to request as a multiple of current vector size
     val memoryGrowthFactor = 1.5
-    // Previous unroll memory held by this thread, for releasing later (only at the very end)
-    val previousMemoryReserved = currentUnrollMemoryForThisThread
+    // Previous unroll memory held by this task, for releasing later (only at the very end)
+    val previousMemoryReserved = currentUnrollMemoryForThisTask
     // Underlying vector for unrolling the block
     var vector = new SizeTrackingVector[Any]
 
     // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
+    keepUnrolling = reserveUnrollMemoryForThisTask(initialMemoryThreshold)
 
     if (!keepUnrolling) {
       logWarning(s"Failed to reserve initial memory threshold of " +
@@ -283,7 +284,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
             // Hold the accounting lock, in case another thread concurrently puts a block that
             // takes up the unrolling space we just ensured here
             accountingLock.synchronized {
-              if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
+              if (!reserveUnrollMemoryForThisTask(amountToRequest)) {
                 // If the first request is not granted, try again after ensuring free space
                 // If there is still not enough space, give up and drop the partition
                 val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
@@ -291,7 +292,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
                   val result = ensureFreeSpace(blockId, spaceToEnsure)
                   droppedBlocks ++= result.droppedBlocks
                 }
-                keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
+                keepUnrolling = reserveUnrollMemoryForThisTask(amountToRequest)
               }
             }
             // New threshold is currentSize * memoryGrowthFactor
@@ -317,9 +318,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       // later when the task finishes.
       if (keepUnrolling) {
         accountingLock.synchronized {
-          val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
-          releaseUnrollMemoryForThisThread(amountToRelease)
-          reservePendingUnrollMemoryForThisThread(amountToRelease)
+          val amountToRelease = currentUnrollMemoryForThisTask - previousMemoryReserved
+          releaseUnrollMemoryForThisTask(amountToRelease)
+          reservePendingUnrollMemoryForThisTask(amountToRelease)
         }
       }
     }
@@ -397,7 +398,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
       }
       // Release the unroll memory used because we no longer need the underlying Array
-      releasePendingUnrollMemoryForThisThread()
+      releasePendingUnrollMemoryForThisTask()
     }
     ResultWithDroppedBlocks(putSuccess, droppedBlocks)
   }
@@ -427,9 +428,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
 
     // Take into account the amount of memory currently occupied by unrolling blocks
     // and minus the pending unroll memory for that block on current thread.
-    val threadId = Thread.currentThread().getId
+    val taskAttemptId = currentTaskAttemptId()
     val actualFreeMemory = freeMemory - currentUnrollMemory +
-      pendingUnrollMemoryMap.getOrElse(threadId, 0L)
+      pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L)
 
     if (actualFreeMemory < space) {
       val rddToAdd = getRddId(blockIdToAdd)
@@ -455,7 +456,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
         logInfo(s"${selectedBlocks.size} blocks selected for dropping")
         for (blockId <- selectedBlocks) {
           val entry = entries.synchronized { entries.get(blockId) }
-          // This should never be null as only one thread should be dropping
+          // This should never be null as only one task should be dropping
           // blocks and removing entries. However the check is still here for
           // future safety.
           if (entry != null) {
@@ -482,79 +483,85 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     entries.synchronized { entries.containsKey(blockId) }
   }
 
+  private def currentTaskAttemptId(): Long = {
+    // In case this is called on the driver, return an invalid task attempt id.
+    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
+  }
+
   /**
-   * Reserve additional memory for unrolling blocks used by this thread.
+   * Reserve additional memory for unrolling blocks used by this task.
    * Return whether the request is granted.
    */
-  def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+  def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
     accountingLock.synchronized {
       val granted = freeMemory > currentUnrollMemory + memory
       if (granted) {
-        val threadId = Thread.currentThread().getId
-        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
+        val taskAttemptId = currentTaskAttemptId()
+        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
       }
       granted
     }
   }
 
   /**
-   * Release memory used by this thread for unrolling blocks.
-   * If the amount is not specified, remove the current thread's allocation altogether.
+   * Release memory used by this task for unrolling blocks.
+   * If the amount is not specified, remove the current task's allocation altogether.
    */
-  def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
-    val threadId = Thread.currentThread().getId
+  def releaseUnrollMemoryForThisTask(memory: Long = -1L): Unit = {
+    val taskAttemptId = currentTaskAttemptId()
     accountingLock.synchronized {
       if (memory < 0) {
-        unrollMemoryMap.remove(threadId)
+        unrollMemoryMap.remove(taskAttemptId)
       } else {
-        unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
-        // If this thread claims no more unroll memory, release it completely
-        if (unrollMemoryMap(threadId) <= 0) {
-          unrollMemoryMap.remove(threadId)
+        unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, memory) - memory
+        // If this task claims no more unroll memory, release it completely
+        if (unrollMemoryMap(taskAttemptId) <= 0) {
+          unrollMemoryMap.remove(taskAttemptId)
         }
       }
     }
   }
 
   /**
-   * Reserve the unroll memory of current unroll successful block used by this thread
+   * Reserve the unroll memory of current unroll successful block used by this task
    * until actually put the block into memory entry.
    */
-  def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = {
-    val threadId = Thread.currentThread().getId
+  def reservePendingUnrollMemoryForThisTask(memory: Long): Unit = {
+    val taskAttemptId = currentTaskAttemptId()
     accountingLock.synchronized {
-       pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory
+       pendingUnrollMemoryMap(taskAttemptId) =
+         pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
     }
   }
 
   /**
-   * Release pending unroll memory of current unroll successful block used by this thread
+   * Release pending unroll memory of current unroll successful block used by this task
    */
-  def releasePendingUnrollMemoryForThisThread(): Unit = {
-    val threadId = Thread.currentThread().getId
+  def releasePendingUnrollMemoryForThisTask(): Unit = {
+    val taskAttemptId = currentTaskAttemptId()
     accountingLock.synchronized {
-      pendingUnrollMemoryMap.remove(threadId)
+      pendingUnrollMemoryMap.remove(taskAttemptId)
     }
   }
 
   /**
-   * Return the amount of memory currently occupied for unrolling blocks across all threads.
+   * Return the amount of memory currently occupied for unrolling blocks across all tasks.
    */
   def currentUnrollMemory: Long = accountingLock.synchronized {
     unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
   }
 
   /**
-   * Return the amount of memory currently occupied for unrolling blocks by this thread.
+   * Return the amount of memory currently occupied for unrolling blocks by this task.
    */
-  def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
-    unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
+  def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized {
+    unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
   }
 
   /**
-   * Return the number of threads currently unrolling blocks.
+   * Return the number of tasks currently unrolling blocks.
    */
-  def numThreadsUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
+  def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
 
   /**
    * Log information about current memory usage.
@@ -566,7 +573,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
     logInfo(
       s"Memory use = ${Utils.bytesToString(blocksMemory)} (blocks) + " +
       s"${Utils.bytesToString(unrollMemory)} (scratch space shared across " +
-      s"$numThreadsUnrolling thread(s)) = ${Utils.bytesToString(totalMemory)}. " +
+      s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(totalMemory)}. " +
       s"Storage limit = ${Utils.bytesToString(maxMemory)}."
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index 96778c9..f495b6a 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -17,26 +17,39 @@
 
 package org.apache.spark.shuffle
 
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.mockito.Mockito._
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.CountDownLatch
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, TaskContext}
 
 class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
+
+  val nextTaskAttemptId = new AtomicInteger()
+
   /** Launch a thread with the given body block and return it. */
   private def startThread(name: String)(body: => Unit): Thread = {
     val thread = new Thread("ShuffleMemorySuite " + name) {
       override def run() {
-        body
+        try {
+          val taskAttemptId = nextTaskAttemptId.getAndIncrement
+          val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
+          when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
+          TaskContext.setTaskContext(mockTaskContext)
+          body
+        } finally {
+          TaskContext.unset()
+        }
       }
     }
     thread.start()
     thread
   }
 
-  test("single thread requesting memory") {
+  test("single task requesting memory") {
     val manager = new ShuffleMemoryManager(1000L)
 
     assert(manager.tryToAcquire(100L) === 100L)
@@ -50,7 +63,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     assert(manager.tryToAcquire(300L) === 300L)
     assert(manager.tryToAcquire(300L) === 200L)
 
-    manager.releaseMemoryForThisThread()
+    manager.releaseMemoryForThisTask()
     assert(manager.tryToAcquire(1000L) === 1000L)
     assert(manager.tryToAcquire(100L) === 0L)
   }
@@ -107,8 +120,8 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
   }
 
 
-  test("threads cannot grow past 1 / N") {
-    // Two threads request 250 bytes first, wait for each other to get it, and then request
+  test("tasks cannot grow past 1 / N") {
+    // Two tasks request 250 bytes first, wait for each other to get it, and then request
     // 500 more; we should only grant 250 bytes to each of them on this second request
 
     val manager = new ShuffleMemoryManager(1000L)
@@ -158,7 +171,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     assert(state.t2Result2 === 250L)
   }
 
-  test("threads can block to get at least 1 / 2N memory") {
+  test("tasks can block to get at least 1 / 2N memory") {
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
     // for a bit and releases 250 bytes, which should then be granted to t2. Further requests
     // by t2 will return false right away because it now has 1 / 2N of the memory.
@@ -224,7 +237,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     }
   }
 
-  test("releaseMemoryForThisThread") {
+  test("releaseMemoryForThisTask") {
     // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. It sleeps
     // for a bit and releases all its memory. t2 should now be able to grab all the memory.
 
@@ -251,9 +264,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
         }
       }
       // Sleep a bit before releasing our memory; this is hacky but it would be difficult to make
-      // sure the other thread blocks for some time otherwise
+      // sure the other task blocks for some time otherwise
       Thread.sleep(300)
-      manager.releaseMemoryForThisThread()
+      manager.releaseMemoryForThisTask()
     }
 
     val t2 = startThread("t2") {
@@ -282,7 +295,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
       t2.join()
     }
 
-    // Both threads should've been able to acquire their memory; the second one will have waited
+    // Both tasks should've been able to acquire their memory; the second one will have waited
     // until the first one acquired 1000 bytes and then released all of it
     state.synchronized {
       assert(state.t1Result === 1000L, "t1 could not allocate memory")
@@ -293,7 +306,7 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
     }
   }
 
-  test("threads should not be granted a negative size") {
+  test("tasks should not be granted a negative size") {
     val manager = new ShuffleMemoryManager(1000L)
     manager.tryToAcquire(700L)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ea49705b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bcee901..f480fd1 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1004,32 +1004,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store = makeBlockManager(12000)
     val memoryStore = store.memoryStore
     assert(memoryStore.currentUnrollMemory === 0)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Reserve
-    memoryStore.reserveUnrollMemoryForThisThread(100)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 100)
-    memoryStore.reserveUnrollMemoryForThisThread(200)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 300)
-    memoryStore.reserveUnrollMemoryForThisThread(500)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 800)
-    memoryStore.reserveUnrollMemoryForThisThread(1000000)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+    memoryStore.reserveUnrollMemoryForThisTask(100)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 100)
+    memoryStore.reserveUnrollMemoryForThisTask(200)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 300)
+    memoryStore.reserveUnrollMemoryForThisTask(500)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 800)
+    memoryStore.reserveUnrollMemoryForThisTask(1000000)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
     // Release
-    memoryStore.releaseUnrollMemoryForThisThread(100)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 700)
-    memoryStore.releaseUnrollMemoryForThisThread(100)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+    memoryStore.releaseUnrollMemoryForThisTask(100)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 700)
+    memoryStore.releaseUnrollMemoryForThisTask(100)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 600)
     // Reserve again
-    memoryStore.reserveUnrollMemoryForThisThread(4400)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
-    memoryStore.reserveUnrollMemoryForThisThread(20000)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
+    memoryStore.reserveUnrollMemoryForThisTask(4400)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 5000)
+    memoryStore.reserveUnrollMemoryForThisTask(20000)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
     // Release again
-    memoryStore.releaseUnrollMemoryForThisThread(1000)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
-    memoryStore.releaseUnrollMemoryForThisThread() // release all
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    memoryStore.releaseUnrollMemoryForThisTask(1000)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
+    memoryStore.releaseUnrollMemoryForThisTask() // release all
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
   }
 
   /**
@@ -1060,24 +1060,24 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val bigList = List.fill(40)(new Array[Byte](1000))
     val memoryStore = store.memoryStore
     val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with all the space in the world. This should succeed and return an array.
     var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
-    memoryStore.releasePendingUnrollMemoryForThisThread()
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+    memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll with not enough space. This should succeed after kicking out someBlock1.
     store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
     store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
     verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     assert(droppedBlocks.size === 1)
     assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
     droppedBlocks.clear()
-    memoryStore.releasePendingUnrollMemoryForThisThread()
+    memoryStore.releasePendingUnrollMemoryForThisTask()
 
     // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
     // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
@@ -1085,7 +1085,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
     unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
     verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
-    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
     assert(droppedBlocks.size === 1)
     assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
     droppedBlocks.clear()
@@ -1099,7 +1099,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val bigList = List.fill(40)(new Array[Byte](1000))
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll with plenty of space. This should succeed and cache both blocks.
     val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
@@ -1110,7 +1110,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(result2.size > 0)
     assert(result1.data.isLeft) // unroll did not drop this block to disk
     assert(result2.data.isLeft)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
     // would not know how to drop them from memory later.
@@ -1126,7 +1126,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!memoryStore.contains("b1"))
     assert(memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.remove("b3")
     store.putIterator("b3", smallIterator, memOnly)
 
@@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
-    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
   }
 
   /**
@@ -1153,7 +1153,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val bigList = List.fill(40)(new Array[Byte](1000))
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
     def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     store.putIterator("b1", smallIterator, memAndDisk)
     store.putIterator("b2", smallIterator, memAndDisk)
@@ -1170,7 +1170,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(!diskStore.contains("b3"))
     memoryStore.remove("b3")
     store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll huge block with not enough space. This should fail and drop the new block to disk
     // directly in addition to kicking out b2 in the process. Memory store should contain only
@@ -1186,7 +1186,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(diskStore.contains("b2"))
     assert(!diskStore.contains("b3"))
     assert(diskStore.contains("b4"))
-    assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+    assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
   }
 
   test("multiple unrolls by the same thread") {
@@ -1195,32 +1195,32 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     val memoryStore = store.memoryStore
     val smallList = List.fill(40)(new Array[Byte](100))
     def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]]
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // All unroll memory used is released because unrollSafely returned an array
     memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
     memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
-    assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+    assert(memoryStore.currentUnrollMemoryForThisTask === 0)
 
     // Unroll memory is not released because unrollSafely returned an iterator
     // that still depends on the underlying vector used in the process
     memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
-    val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+    val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB3 > 0)
 
     // The unroll memory owned by this thread builds on top of its value after the previous unrolls
     memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
-    val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+    val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
 
     // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
     memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
-    val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+    val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisTask
     memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
-    val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+    val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisTask
     memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
-    val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+    val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisTask
     assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
     assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org