You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/02/17 19:28:21 UTC

spark git commit: [SPARK-18986][CORE] ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator

Repository: spark
Updated Branches:
  refs/heads/master 3d0c3af0a -> 4cc06f4eb


[SPARK-18986][CORE] ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator

## What changes were proposed in this pull request?

`ExternalAppendOnlyMap.forceSpill` now uses an assert to check if an iterator is not null in the map. However, the assertion is only true after the map is asked for iterator. Before it, if another memory consumer asks more memory than currently available, `ExternalAppendOnlyMap.forceSpill` is also be called too. In this case, we will see failure like this:

    [info]   java.lang.AssertionError: assertion failed
    [info]   at scala.Predef$.assert(Predef.scala:156)
    [info]   at org.apache.spark.util.collection.ExternalAppendOnlyMap.forceSpill(ExternalAppendOnlyMap.scala:196)
    [info]   at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:111)
    [info]   at org.apache.spark.util.collection.ExternalAppendOnlyMapSuite$$anonfun$13.apply$mcV$sp(ExternalAppendOnlyMapSuite.scala:294)

This fixing is motivated by http://apache-spark-developers-list.1001551.n3.nabble.com/java-lang-AssertionError-assertion-failed-tc20277.html.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #16387 from viirya/fix-externalappendonlymap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cc06f4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cc06f4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cc06f4e

Branch: refs/heads/master
Commit: 4cc06f4eb1fbf1ba1fc6165783e22f93dc3b14ac
Parents: 3d0c3af
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Feb 17 11:28:16 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Fri Feb 17 11:28:16 2017 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala    | 17 ++++++++++++-----
 .../collection/ExternalAppendOnlyMapSuite.scala    | 11 +++++++++++
 2 files changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cc06f4e/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 948cc3b..8aafda5 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -192,12 +192,19 @@ class ExternalAppendOnlyMap[K, V, C](
    * It will be called by TaskMemoryManager when there is not enough memory for the task.
    */
   override protected[this] def forceSpill(): Boolean = {
-    assert(readingIterator != null)
-    val isSpilled = readingIterator.spill()
-    if (isSpilled) {
-      currentMap = null
+    if (readingIterator != null) {
+      val isSpilled = readingIterator.spill()
+      if (isSpilled) {
+        currentMap = null
+      }
+      isSpilled
+    } else if (currentMap.size > 0) {
+      spill(currentMap)
+      currentMap = new SizeTrackingAppendOnlyMap[K, C]
+      true
+    } else {
+      false
     }
-    isSpilled
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4cc06f4e/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index c8b6a33..35312f2 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -283,6 +283,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
     sc.stop()
   }
 
+  test("ExternalAppendOnlyMap shouldn't fail when forced to spill before calling its iterator") {
+    val size = 1000
+    val conf = createSparkConf(loadDefaults = true)
+    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
+    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+    val map = createExternalMap[String]
+    val consumer = createExternalMap[String]
+    map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i)))
+    assert(map.spill(10000, consumer) == 0L)
+  }
+
   test("spilling with hash collisions") {
     val size = 1000
     val conf = createSparkConf(loadDefaults = true)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org