You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/23 09:43:31 UTC

spark git commit: [SPARK-9266] Prevent "managed memory leak detected" exception from masking original exception

Repository: spark
Updated Branches:
  refs/heads/master b983d493b -> ac3ae0f2b


[SPARK-9266] Prevent "managed memory leak detected" exception from masking original exception

When a task fails with an exception and also fails to properly clean up its managed memory, the `spark.unsafe.exceptionOnMemoryLeak` memory leak detection mechanism's exceptions will mask the original exception that caused the task to fail. We should throw the memory leak exception only if no other exception occurred.

Author: Josh Rosen <jo...@databricks.com>

Closes #7603 from JoshRosen/SPARK-9266 and squashes the following commits:

c268cb5 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-9266
c1f0167 [Josh Rosen] Fix the error masking problem
448eae8 [Josh Rosen] Add regression test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac3ae0f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac3ae0f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac3ae0f2

Branch: refs/heads/master
Commit: ac3ae0f2be88e0b53f65342efe5fcbe67b5c2106
Parents: b983d49
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Jul 23 00:43:26 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 23 00:43:26 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  7 ++++--
 .../scala/org/apache/spark/FailureSuite.scala   | 25 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac3ae0f2/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 581b400..e76664f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -209,16 +209,19 @@ private[spark] class Executor(
 
         // Run the actual task and measure its runtime.
         taskStart = System.currentTimeMillis()
+        var threwException = true
         val (value, accumUpdates) = try {
-          task.run(
+          val res = task.run(
             taskAttemptId = taskId,
             attemptNumber = attemptNumber,
             metricsSystem = env.metricsSystem)
+          threwException = false
+          res
         } finally {
           val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
           if (freedMemory > 0) {
             val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
-            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
+            if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
               throw new SparkException(errMsg)
             } else {
               logError(errMsg)

http://git-wip-us.apache.org/repos/asf/spark/blob/ac3ae0f2/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index b099cd3..69cb4b4 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
     FailureSuiteState.clear()
   }
 
+  test("managed memory leak error should not mask other failures (SPARK-9266") {
+    val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true")
+    sc = new SparkContext("local[1,1]", "test", conf)
+
+    // If a task leaks memory but fails due to some other cause, then make sure that the original
+    // cause is preserved
+    val thrownDueToTaskFailure = intercept[SparkException] {
+      sc.parallelize(Seq(0)).mapPartitions { iter =>
+        TaskContext.get().taskMemoryManager().allocate(128)
+        throw new Exception("intentional task failure")
+        iter
+      }.count()
+    }
+    assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure"))
+
+    // If the task succeeded but memory was leaked, then the task should fail due to that leak
+    val thrownDueToMemoryLeak = intercept[SparkException] {
+      sc.parallelize(Seq(0)).mapPartitions { iter =>
+        TaskContext.get().taskMemoryManager().allocate(128)
+        iter
+      }.count()
+    }
+    assert(thrownDueToMemoryLeak.getMessage.contains("memory leak"))
+  }
+
   // TODO: Need to add tests with shuffle fetch failures.
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org