You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/18 22:51:53 UTC
spark git commit: [SPARK-11126][SQL] Fix a memory leak in
SQLListener._stageIdToStageMetrics
Repository: spark
Updated Branches:
refs/heads/master a337c235a -> 94c8fef29
[SPARK-11126][SQL] Fix a memory leak in SQLListener._stageIdToStageMetrics
SQLListener adds all stage infos to `_stageIdToStageMetrics`, but only removes stage infos belonging to SQL executions. This PR fixed it by ignoring stages that don't belong to SQL executions.
Reported by Terry Hoo in https://www.mail-archive.com/userspark.apache.org/msg38810.html
Author: zsxwing <zs...@gmail.com>
Closes #9132 from zsxwing/SPARK-11126.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94c8fef2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94c8fef2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94c8fef2
Branch: refs/heads/master
Commit: 94c8fef296e5cdac9a93ed34acc079e51839caa7
Parents: a337c23
Author: zsxwing <zs...@gmail.com>
Authored: Sun Oct 18 13:51:45 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sun Oct 18 13:51:45 2015 -0700
----------------------------------------------------------------------
.../spark/sql/execution/ui/SQLListener.scala | 8 +++++++-
.../spark/sql/execution/ui/SQLListenerSuite.scala | 18 ++++++++++++++++--
2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/94c8fef2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b302b51..5a072de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -126,7 +126,13 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
val stageId = stageSubmitted.stageInfo.stageId
val stageAttemptId = stageSubmitted.stageInfo.attemptId
// Always override metrics for old stage attempt
- _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+ if (_stageIdToStageMetrics.contains(stageId)) {
+ _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
+ } else {
+ // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart".
+ // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution.
+ // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126).
+ }
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/94c8fef2/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index cc1c1e1..03bcee9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -313,7 +313,22 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
assert(executionUIData.failedJobs === Seq(0))
}
- ignore("no memory leak") {
+ test("SPARK-11126: no memory leak when running non SQL jobs") {
+ val previousStageNumber = sqlContext.listener.stageIdToStageMetrics.size
+ sqlContext.sparkContext.parallelize(1 to 10).foreach(i => ())
+ // listener should ignore the non SQL stage
+ assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber)
+
+ sqlContext.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
+ // listener should save the SQL stage
+ assert(sqlContext.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
+ }
+
+}
+
+class SQLListenerMemoryLeakSuite extends SparkFunSuite {
+
+ test("no memory leak") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
@@ -348,5 +363,4 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
sc.stop()
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org