You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/18 22:51:53 UTC

spark git commit: [SPARK-11126][SQL] Fix a memory leak in SQLListener._stageIdToStageMetrics

Repository: spark
Updated Branches:
  refs/heads/master a337c235a -> 94c8fef29


[SPARK-11126][SQL] Fix a memory leak in SQLListener._stageIdToStageMetrics

SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions.

Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html

Author: zsxwing <zs...@gmail.com>

Closes #9132 from zsxwing/SPARK-11126.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94c8fef2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94c8fef2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94c8fef2

Branch: refs/heads/master
Commit: 94c8fef296e5cdac9a93ed34acc079e51839caa7
Parents: a337c23
Author: zsxwing <zs...@gmail.com>
Authored: Sun Oct 18 13:51:45 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sun Oct 18 13:51:45 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/ui/SQLListener.scala      |  8 +++++++-
 .../spark/sql/execution/ui/SQLListenerSuite.scala | 18 ++++++++++++++++--
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94c8fef2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b302b51..5a072de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
     val stageId = stageSubmitted.stageInfo.stageId
     val stageAttemptId = stageSubmitted.stageInfo.attemptId
     // Always override metrics for old stage attempt
-    _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+    if (_stageIdToStageMetrics.contains(stageId)) {
+      _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+    } else {
+      // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart".
+      // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution.
+      // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126).
+    }
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/94c8fef2/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index cc1c1e1..03bcee9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -313,7 +313,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     assert(executionUIData.failedJobs === Seq(0))
   }
 
-  ignore("no memory leak") {
+  test("SPARK-11126: no memory leak when running non SQL jobs") {
+    val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size
+    sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ())
+    // listener should ignore the non SQL stage
+    assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber)
+
+    sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
+    // listener should save the SQL stage
+    assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
+  }
+
+}
+
+class SQLListenerMemoryLeakSuite extends SparkFunSuite {
+
+  test("no memory leak") {
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
@@ -348,5 +363,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       sc.stop()
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org