You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/21 19:02:28 UTC

spark git commit: [SPARK-4452] [CORE] Shuffle data structures can starve others on the same thread for memory

Repository: spark
Updated Branches:
  refs/heads/master 649335d6c -> 4f369176b


[SPARK-4452] [CORE] Shuffle data structures can starve others on the same thread for memory

## What changes were proposed in this pull request?
In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

## How was this patch tested?
add two unit tests for it.

Author: Lianhui Wang <li...@gmail.com>

Closes #10024 from lianhuiwang/SPARK-4452-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f369176
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f369176
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f369176

Branch: refs/heads/master
Commit: 4f369176b750c980682a6be468cefa8627769c72
Parents: 649335d
Author: Lianhui Wang <li...@gmail.com>
Authored: Thu Apr 21 10:02:23 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Thu Apr 21 10:02:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryConsumer.java |  19 ++-
 .../apache/spark/memory/TaskMemoryManager.java  |   7 +
 .../util/collection/ExternalAppendOnlyMap.scala | 111 ++++++++++++--
 .../spark/util/collection/ExternalSorter.scala  | 150 ++++++++++++++++---
 .../spark/util/collection/Spillable.scala       |  48 ++++--
 .../collection/ExternalAppendOnlyMapSuite.scala |  15 ++
 .../util/collection/ExternalSorterSuite.scala   |  17 +++
 project/MimaExcludes.scala                      |   3 +
 8 files changed, 324 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 36138cc..840f13b 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -45,7 +45,7 @@ public abstract class MemoryConsumer {
   /**
    * Returns the size of used memory in bytes.
    */
-  long getUsed() {
+  protected long getUsed() {
     return used;
   }
 
@@ -130,4 +130,21 @@ public abstract class MemoryConsumer {
     used -= page.size();
     taskMemoryManager.freePage(page, this);
   }
+
+  /**
+   * Allocates a heap memory of `size`.
+   */
+  public long acquireOnHeapMemory(long size) {
+    long granted = taskMemoryManager.acquireExecutionMemory(size, MemoryMode.ON_HEAP, this);
+    used += granted;
+    return granted;
+  }
+
+  /**
+   * Release N bytes of heap memory.
+   */
+  public void freeOnHeapMemory(long size) {
+    taskMemoryManager.releaseExecutionMemory(size, MemoryMode.ON_HEAP, this);
+    used -= size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 9044bb4..6b7d9aa 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -408,4 +408,11 @@ public class TaskMemoryManager {
   public long getMemoryConsumptionForThisTask() {
     return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
   }
+
+  /**
+   * Returns Tungsten memory mode
+   */
+  public MemoryMode getTungstenMemoryMode(){
+    return tungstenMemoryMode;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 95351e9..fc71f83 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -61,10 +61,10 @@ class ExternalAppendOnlyMap[K, V, C](
     blockManager: BlockManager = SparkEnv.get.blockManager,
     context: TaskContext = TaskContext.get(),
     serializerManager: SerializerManager = SparkEnv.get.serializerManager)
-  extends Iterable[(K, C)]
+  extends Spillable[SizeTracker](context.taskMemoryManager())
   with Serializable
   with Logging
-  with Spillable[SizeTracker] {
+  with Iterable[(K, C)] {
 
   if (context == null) {
     throw new IllegalStateException(
@@ -81,9 +81,7 @@ class ExternalAppendOnlyMap[K, V, C](
     this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, TaskContext.get())
   }
 
-  override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
-
-  private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
+  @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskMapIterator]
   private val sparkConf = SparkEnv.get.conf
   private val diskBlockManager = blockManager.diskBlockManager
@@ -117,6 +115,8 @@ class ExternalAppendOnlyMap[K, V, C](
   private val keyComparator = new HashComparator[K]
   private val ser = serializer.newInstance()
 
+  @volatile private var readingIterator: SpillableIterator = null
+
   /**
    * Number of files this map has spilled so far.
    * Exposed for testing.
@@ -182,6 +182,29 @@ class ExternalAppendOnlyMap[K, V, C](
    * Sort the existing contents of the in-memory map and spill them to a temporary file on disk.
    */
   override protected[this] def spill(collection: SizeTracker): Unit = {
+    val inMemoryIterator = currentMap.destructiveSortedIterator(keyComparator)
+    val diskMapIterator = spillMemoryIteratorToDisk(inMemoryIterator)
+    spilledMaps.append(diskMapIterator)
+  }
+
+  /**
+   * Force to spilling the current in-memory collection to disk to release memory,
+   * It will be called by TaskMemoryManager when there is not enough memory for the task.
+   */
+  override protected[this] def forceSpill(): Boolean = {
+    assert(readingIterator != null)
+    val isSpilled = readingIterator.spill()
+    if (isSpilled) {
+      currentMap = null
+    }
+    isSpilled
+  }
+
+  /**
+   * Spill the in-memory Iterator to a temporary file on disk.
+   */
+  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: Iterator[(K, C)])
+      : DiskMapIterator = {
     val (blockId, file) = diskBlockManager.createTempLocalBlock()
     curWriteMetrics = new ShuffleWriteMetrics()
     var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics)
@@ -202,9 +225,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
     var success = false
     try {
-      val it = currentMap.destructiveSortedIterator(keyComparator)
-      while (it.hasNext) {
-        val kv = it.next()
+      while (inMemoryIterator.hasNext) {
+        val kv = inMemoryIterator.next()
         writer.write(kv._1, kv._2)
         objectsWritten += 1
 
@@ -237,7 +259,17 @@ class ExternalAppendOnlyMap[K, V, C](
       }
     }
 
-    spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
+    new DiskMapIterator(file, blockId, batchSizes)
+  }
+
+  /**
+   * Returns a destructive iterator for iterating over the entries of this map.
+   * If this iterator is forced spill to disk to release memory when there is not enough memory,
+   * it returns pairs from an on-disk map.
+   */
+  def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, C)] = {
+    readingIterator = new SpillableIterator(inMemoryIterator)
+    readingIterator
   }
 
   /**
@@ -250,15 +282,18 @@ class ExternalAppendOnlyMap[K, V, C](
         "ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
     }
     if (spilledMaps.isEmpty) {
-      CompletionIterator[(K, C), Iterator[(K, C)]](currentMap.iterator, freeCurrentMap())
+      CompletionIterator[(K, C), Iterator[(K, C)]](
+        destructiveIterator(currentMap.iterator), freeCurrentMap())
     } else {
       new ExternalIterator()
     }
   }
 
   private def freeCurrentMap(): Unit = {
-    currentMap = null // So that the memory can be garbage-collected
-    releaseMemory()
+    if (currentMap != null) {
+      currentMap = null // So that the memory can be garbage-collected
+      releaseMemory()
+    }
   }
 
   /**
@@ -272,8 +307,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Input streams are derived both from the in-memory map and spilled maps on disk
     // The in-memory map is sorted in place, while the spilled maps are already in sorted order
-    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](
-      currentMap.destructiveSortedIterator(keyComparator), freeCurrentMap())
+    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
+      currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
 
     inputStreams.foreach { it =>
@@ -532,8 +567,56 @@ class ExternalAppendOnlyMap[K, V, C](
     context.addTaskCompletionListener(context => cleanup())
   }
 
+  private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
+    extends Iterator[(K, C)] {
+
+    private val SPILL_LOCK = new Object()
+
+    private var nextUpstream: Iterator[(K, C)] = null
+
+    private var cur: (K, C) = readNext()
+
+    private var hasSpilled: Boolean = false
+
+    def spill(): Boolean = SPILL_LOCK.synchronized {
+      if (hasSpilled) {
+        false
+      } else {
+        logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+          s"it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+        nextUpstream = spillMemoryIteratorToDisk(upstream)
+        hasSpilled = true
+        true
+      }
+    }
+
+    def readNext(): (K, C) = SPILL_LOCK.synchronized {
+      if (nextUpstream != null) {
+        upstream = nextUpstream
+        nextUpstream = null
+      }
+      if (upstream.hasNext) {
+        upstream.next()
+      } else {
+        null
+      }
+    }
+
+    override def hasNext(): Boolean = cur != null
+
+    override def next(): (K, C) = {
+      val r = cur
+      cur = readNext()
+      r
+    }
+  }
+
   /** Convenience function to hash the given (K, C) pair by the key. */
   private def hashKey(kc: (K, C)): Int = ExternalAppendOnlyMap.hash(kc._1)
+
+  override def toString(): String = {
+    this.getClass.getName + "@" + java.lang.Integer.toHexString(this.hashCode())
+  }
 }
 
 private[spark] object ExternalAppendOnlyMap {

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 916053f..4067ace 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -93,10 +93,8 @@ private[spark] class ExternalSorter[K, V, C](
     partitioner: Option[Partitioner] = None,
     ordering: Option[Ordering[K]] = None,
     serializer: Serializer = SparkEnv.get.serializer)
-  extends Logging
-  with Spillable[WritablePartitionedPairCollection[K, C]] {
-
-  override protected[this] def taskMemoryManager: TaskMemoryManager = context.taskMemoryManager()
+  extends Spillable[WritablePartitionedPairCollection[K, C]](context.taskMemoryManager())
+  with Logging {
 
   private val conf = SparkEnv.get.conf
 
@@ -126,8 +124,8 @@ private[spark] class ExternalSorter[K, V, C](
   // Data structures to store in-memory objects before we spill. Depending on whether we have an
   // Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we
   // store them in an array buffer.
-  private var map = new PartitionedAppendOnlyMap[K, C]
-  private var buffer = new PartitionedPairBuffer[K, C]
+  @volatile private var map = new PartitionedAppendOnlyMap[K, C]
+  @volatile private var buffer = new PartitionedPairBuffer[K, C]
 
   // Total spilling statistics
   private var _diskBytesSpilled = 0L
@@ -137,6 +135,10 @@ private[spark] class ExternalSorter[K, V, C](
   private var _peakMemoryUsedBytes: Long = 0L
   def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
 
+  @volatile private var isShuffleSort: Boolean = true
+  private val forceSpillFiles = new ArrayBuffer[SpilledFile]
+  @volatile private var readingIterator: SpillableIterator = null
+
   // A comparator for keys K that orders them within a partition to allow aggregation or sorting.
   // Can be a partial ordering by hash code if a total ordering is not provided through by the
   // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some
@@ -235,6 +237,34 @@ private[spark] class ExternalSorter[K, V, C](
    * @param collection whichever collection we're using (map or buffer)
    */
   override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
+    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
+    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+    spills.append(spillFile)
+  }
+
+  /**
+   * Force to spilling the current in-memory collection to disk to release memory,
+   * It will be called by TaskMemoryManager when there is not enough memory for the task.
+   */
+  override protected[this] def forceSpill(): Boolean = {
+    if (isShuffleSort) {
+      false
+    } else {
+      assert(readingIterator != null)
+      val isSpilled = readingIterator.spill()
+      if (isSpilled) {
+        map = null
+        buffer = null
+      }
+      isSpilled
+    }
+  }
+
+  /**
+   * Spill contents of in-memory iterator to a temporary file on disk.
+   */
+  private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator)
+      : SpilledFile = {
     // Because these files may be read during shuffle, their compression must be controlled by
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more context.
@@ -271,12 +301,11 @@ private[spark] class ExternalSorter[K, V, C](
 
     var success = false
     try {
-      val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
-      while (it.hasNext) {
-        val partitionId = it.nextPartition()
+      while (inMemoryIterator.hasNext) {
+        val partitionId = inMemoryIterator.nextPartition()
         require(partitionId >= 0 && partitionId < numPartitions,
           s"partition Id: ${partitionId} should be in the range [0, ${numPartitions})")
-        it.writeNext(writer)
+        inMemoryIterator.writeNext(writer)
         elementsPerPartition(partitionId) += 1
         objectsWritten += 1
 
@@ -308,7 +337,7 @@ private[spark] class ExternalSorter[K, V, C](
       }
     }
 
-    spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition))
+    SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
   }
 
   /**
@@ -600,6 +629,20 @@ private[spark] class ExternalSorter[K, V, C](
   }
 
   /**
+   * Returns a destructive iterator for iterating over the entries of this map.
+   * If this iterator is forced spill to disk to release memory when there is not enough memory,
+   * it returns pairs from an on-disk map.
+   */
+  def destructiveIterator(memoryIterator: Iterator[((Int, K), C)]): Iterator[((Int, K), C)] = {
+    if (isShuffleSort) {
+      memoryIterator
+    } else {
+      readingIterator = new SpillableIterator(memoryIterator)
+      readingIterator
+    }
+  }
+
+  /**
    * Return an iterator over all the data written to this object, grouped by partition and
    * aggregated by the requested aggregator. For each partition we then have an iterator over its
    * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
@@ -618,21 +661,26 @@ private[spark] class ExternalSorter[K, V, C](
       // we don't even need to sort by anything other than partition ID
       if (!ordering.isDefined) {
         // The user hasn't requested sorted keys, so only sort by partition ID, not key
-        groupByPartition(collection.partitionedDestructiveSortedIterator(None))
+        groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
       } else {
         // We do need to sort by both partition ID and key
-        groupByPartition(collection.partitionedDestructiveSortedIterator(Some(keyComparator)))
+        groupByPartition(destructiveIterator(
+          collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
       }
     } else {
       // Merge spilled and in-memory data
-      merge(spills, collection.partitionedDestructiveSortedIterator(comparator))
+      merge(spills, destructiveIterator(
+        collection.partitionedDestructiveSortedIterator(comparator)))
     }
   }
 
   /**
    * Return an iterator over all the data written to this object, aggregated by our aggregator.
    */
-  def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+  def iterator: Iterator[Product2[K, C]] = {
+    isShuffleSort = false
+    partitionedIterator.flatMap(pair => pair._2)
+  }
 
   /**
    * Write all the data added into this ExternalSorter into a file in the disk store. This is
@@ -689,11 +737,15 @@ private[spark] class ExternalSorter[K, V, C](
   }
 
   def stop(): Unit = {
-    map = null // So that the memory can be garbage-collected
-    buffer = null // So that the memory can be garbage-collected
     spills.foreach(s => s.file.delete())
     spills.clear()
-    releaseMemory()
+    forceSpillFiles.foreach(s => s.file.delete())
+    forceSpillFiles.clear()
+    if (map != null || buffer != null) {
+      map = null // So that the memory can be garbage-collected
+      buffer = null // So that the memory can be garbage-collected
+      releaseMemory()
+    }
   }
 
   /**
@@ -727,4 +779,66 @@ private[spark] class ExternalSorter[K, V, C](
       (elem._1._2, elem._2)
     }
   }
+
+  private[this] class SpillableIterator(var upstream: Iterator[((Int, K), C)])
+    extends Iterator[((Int, K), C)] {
+
+    private val SPILL_LOCK = new Object()
+
+    private var nextUpstream: Iterator[((Int, K), C)] = null
+
+    private var cur: ((Int, K), C) = readNext()
+
+    private var hasSpilled: Boolean = false
+
+    def spill(): Boolean = SPILL_LOCK.synchronized {
+      if (hasSpilled) {
+        false
+      } else {
+        val inMemoryIterator = new WritablePartitionedIterator {
+          private[this] var cur = if (upstream.hasNext) upstream.next() else null
+
+          def writeNext(writer: DiskBlockObjectWriter): Unit = {
+            writer.write(cur._1._2, cur._2)
+            cur = if (upstream.hasNext) upstream.next() else null
+          }
+
+          def hasNext(): Boolean = cur != null
+
+          def nextPartition(): Int = cur._1._1
+        }
+        logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
+          s" it will release ${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
+        val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
+        forceSpillFiles.append(spillFile)
+        val spillReader = new SpillReader(spillFile)
+        nextUpstream = (0 until numPartitions).iterator.flatMap { p =>
+          val iterator = spillReader.readNextPartition()
+          iterator.map(cur => ((p, cur._1), cur._2))
+        }
+        hasSpilled = true
+        true
+      }
+    }
+
+    def readNext(): ((Int, K), C) = SPILL_LOCK.synchronized {
+      if (nextUpstream != null) {
+        upstream = nextUpstream
+        nextUpstream = null
+      }
+      if (upstream.hasNext) {
+        upstream.next()
+      } else {
+        null
+      }
+    }
+
+    override def hasNext(): Boolean = cur != null
+
+    override def next(): ((Int, K), C) = {
+      val r = cur
+      cur = readNext()
+      r
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 25ca203..aee6399 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -19,13 +19,14 @@ package org.apache.spark.util.collection
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.Logging
-import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
+import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
 
 /**
  * Spills contents of an in-memory collection to disk when the memory threshold
  * has been exceeded.
  */
-private[spark] trait Spillable[C] extends Logging {
+private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
+  extends MemoryConsumer(taskMemoryManager) with Logging {
   /**
    * Spills the current in-memory collection to disk, and releases the memory.
    *
@@ -33,16 +34,19 @@ private[spark] trait Spillable[C] extends Logging {
    */
   protected def spill(collection: C): Unit
 
+  /**
+   * Force to spilling the current in-memory collection to disk to release memory,
+   * It will be called by TaskMemoryManager when there is not enough memory for the task.
+   */
+  protected def forceSpill(): Boolean
+
   // Number of elements read from input since last spill
-  protected def elementsRead: Long = _elementsRead
+  @volatile protected def elementsRead: Long = _elementsRead
 
   // Called by subclasses every time a record is read
   // It's used for checking spilling frequency
   protected def addElementsRead(): Unit = { _elementsRead += 1 }
 
-  // Memory manager that can be used to acquire/release memory
-  protected[this] def taskMemoryManager: TaskMemoryManager
-
   // Initial threshold for the size of a collection before we start tracking its memory usage
   // For testing only
   private[this] val initialMemoryThreshold: Long =
@@ -55,13 +59,13 @@ private[spark] trait Spillable[C] extends Logging {
 
   // Threshold for this collection's size in bytes before we start tracking its memory usage
   // To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
-  private[this] var myMemoryThreshold = initialMemoryThreshold
+  @volatile private[this] var myMemoryThreshold = initialMemoryThreshold
 
   // Number of elements read from input since last spill
   private[this] var _elementsRead = 0L
 
   // Number of bytes spilled in total
-  private[this] var _memoryBytesSpilled = 0L
+  @volatile private[this] var _memoryBytesSpilled = 0L
 
   // Number of spills
   private[this] var _spillCount = 0
@@ -79,8 +83,7 @@ private[spark] trait Spillable[C] extends Logging {
     if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
       // Claim up to double our current memory from the shuffle memory pool
       val amountToRequest = 2 * currentMemory - myMemoryThreshold
-      val granted =
-        taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
+      val granted = acquireOnHeapMemory(amountToRequest)
       myMemoryThreshold += granted
       // If we were granted too little memory to grow further (either tryToAcquire returned 0,
       // or we already had more memory than myMemoryThreshold), spill the current collection
@@ -100,6 +103,27 @@ private[spark] trait Spillable[C] extends Logging {
   }
 
   /**
+   * Spill some data to disk to release memory, which will be called by TaskMemoryManager
+   * when there is not enough memory for the task.
+   */
+  override def spill(size: Long, trigger: MemoryConsumer): Long = {
+    if (trigger != this && taskMemoryManager.getTungstenMemoryMode == MemoryMode.ON_HEAP) {
+      val isSpilled = forceSpill()
+      if (!isSpilled) {
+        0L
+      } else {
+        _elementsRead = 0
+        val freeMemory = myMemoryThreshold - initialMemoryThreshold
+        _memoryBytesSpilled += freeMemory
+        releaseMemory()
+        freeMemory
+      }
+    } else {
+      0L
+    }
+  }
+
+  /**
    * @return number of bytes spilled in total
    */
   def memoryBytesSpilled: Long = _memoryBytesSpilled
@@ -108,9 +132,7 @@ private[spark] trait Spillable[C] extends Logging {
    * Release our memory back to the execution pool so that other tasks can grab it.
    */
   def releaseMemory(): Unit = {
-    // The amount we requested does not include the initial memory tracking threshold
-    taskMemoryManager.releaseExecutionMemory(
-      myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
+    freeOnHeapMemory(myMemoryThreshold - initialMemoryThreshold)
     myMemoryThreshold = initialMemoryThreshold
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 2410118..5141e36 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -416,4 +416,19 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
+  test("force to spill for external aggregation") {
+    val conf = createSparkConf(loadDefaults = false)
+      .set("spark.shuffle.memoryFraction", "0.01")
+      .set("spark.memory.useLegacyMode", "true")
+      .set("spark.testing.memory", "100000000")
+      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+    sc = new SparkContext("local", "test", conf)
+    val N = 2e5.toInt
+    sc.parallelize(1 to N, 2)
+      .map { i => (i, i) }
+      .groupByKey()
+      .reduceByKey(_ ++ _)
+      .count()
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index a1a7ac9..4dd8e31 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -608,4 +608,21 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
       }
     }
   }
+
+  test("force to spill for external sorter") {
+    val conf = createSparkConf(loadDefaults = false, kryo = false)
+      .set("spark.shuffle.memoryFraction", "0.01")
+      .set("spark.memory.useLegacyMode", "true")
+      .set("spark.testing.memory", "100000000")
+      .set("spark.shuffle.sort.bypassMergeThreshold", "0")
+    sc = new SparkContext("local", "test", conf)
+    val N = 2e5.toInt
+    val p = new org.apache.spark.HashPartitioner(2)
+    val p2 = new org.apache.spark.HashPartitioner(3)
+    sc.parallelize(1 to N, 3)
+      .map { x => (x % 100000) -> x.toLong }
+      .repartitionAndSortWithinPartitions(p)
+      .repartitionAndSortWithinPartitions(p2)
+      .count()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f369176/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 65b0a97..3c9f153 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -660,6 +660,9 @@ object MimaExcludes {
         // SPARK-14704: Create accumulators in TaskMetrics
         ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.this"),
         ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.this")
+      ) ++ Seq(
+        // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
+        ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
       )
     case v if v.startsWith("1.6") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org