You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/13 21:50:25 UTC
spark git commit: [SPARK-22471][SQL] SQLListener consumes much memory
causing OutOfMemoryError
Repository: spark
Updated Branches:
refs/heads/branch-2.2 af0b1855f -> d905e85d2
[SPARK-22471][SQL] SQLListener consumes much memory causing OutOfMemoryError
## What changes were proposed in this pull request?
This PR addresses the issue [SPARK-22471](https://issues.apache.org/jira/browse/SPARK-22471). The modified version of `SQLListener` respects the setting `spark.ui.retainedStages` and keeps the number of the tracked stages within the specified limit. The hash map `_stageIdToStageMetrics` does not outgrow the limit, hence overall memory consumption does not grow with time anymore.
A 2.2-compatible fix. Maybe incompatible with 2.3 due to #19681.
## How was this patch tested?
A new unit test covers this fix - see `SQLListenerMemorySuite.scala`.
Author: Arseniy Tashoyan <ta...@gmail.com>
Closes #19711 from tashoyan/SPARK-22471-branch-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d905e85d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d905e85d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d905e85d
Branch: refs/heads/branch-2.2
Commit: d905e85d2f2229fc26e8af8f74771de38a25c577
Parents: af0b185
Author: Arseniy Tashoyan <ta...@gmail.com>
Authored: Mon Nov 13 13:50:12 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Nov 13 13:50:12 2017 -0800
----------------------------------------------------------------------
.../spark/sql/execution/ui/SQLListener.scala | 13 ++-
.../execution/ui/SQLListenerMemorySuite.scala | 106 +++++++++++++++++++
.../sql/execution/ui/SQLListenerSuite.scala | 47 +-------
3 files changed, 119 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b4a9123..e0c8cb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -101,6 +101,9 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
+ private val retainedStages = conf.getInt("spark.ui.retainedStages",
+ SparkUI.DEFAULT_RETAINED_STAGES)
+
private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
// Old data in the following fields must be removed in "trimExecutionsIfNecessary".
@@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
*/
private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
- private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
+ private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, SQLStageMetrics]()
private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
@@ -207,6 +210,14 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
}
}
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
+ val extraStages = _stageIdToStageMetrics.size - retainedStages
+ if (extraStages > 0) {
+ val toRemove = _stageIdToStageMetrics.take(extraStages).keys
+ _stageIdToStageMetrics --= toRemove
+ }
+ }
+
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
if (taskEnd.taskMetrics != null) {
updateTaskAccumulatorValues(
http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
new file mode 100644
index 0000000..24a09f3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.internal.config
+import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.functions._
+
+class SQLListenerMemorySuite extends SparkFunSuite {
+
+ test("SPARK-22471 - _stageIdToStageMetrics grows too large on long executions") {
+ quietly {
+ val conf = new SparkConf()
+ .setMaster("local[*]")
+ .setAppName("MemoryLeakTest")
+ /* Don't retry the tasks to run this test quickly */
+ .set(config.MAX_TASK_FAILURES, 1)
+ .set("spark.ui.retainedStages", "50")
+ withSpark(new SparkContext(conf)) { sc =>
+ SparkSession.sqlListener.set(null)
+ val spark = new SparkSession(sc)
+ import spark.implicits._
+
+ val sample = List(
+ (1, 10),
+ (2, 20),
+ (3, 30)
+ ).toDF("id", "value")
+
+ /* Some complex computation with many stages. */
+ val joins = 1 to 100
+ val summedCol: Column = joins
+ .map(j => col(s"value$j"))
+ .reduce(_ + _)
+ val res = joins
+ .map { j =>
+ sample.select($"id", $"value" * j as s"value$j")
+ }
+ .reduce(_.join(_, "id"))
+ .select($"id", summedCol as "value")
+ .groupBy("id")
+ .agg(sum($"value") as "value")
+ .orderBy("id")
+ res.collect()
+
+ sc.listenerBus.waitUntilEmpty(10000)
+ assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 50)
+ }
+ }
+ }
+
+ test("no memory leak") {
+ quietly {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
+ .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
+ withSpark(new SparkContext(conf)) { sc =>
+ SparkSession.sqlListener.set(null)
+ val spark = new SparkSession(sc)
+ import spark.implicits._
+ // Run 100 successful executions and 100 failed executions.
+ // Each execution only has one job and one stage.
+ for (i <- 0 until 100) {
+ val df = Seq(
+ (1, 1),
+ (2, 2)
+ ).toDF()
+ df.collect()
+ try {
+ df.foreach(_ => throw new RuntimeException("Oops"))
+ } catch {
+ case e: SparkException => // This is expected for a failed job
+ }
+ }
+ sc.listenerBus.waitUntilEmpty(10000)
+ assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
+ assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
+ // 50 for successful executions and 50 for failed executions
+ assert(spark.sharedState.listener.executionIdToData.size <= 100)
+ assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
+ assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index e6cd41e..23420a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -24,14 +24,12 @@ import org.mockito.Mockito.mock
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.config
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
@@ -485,46 +483,3 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
sc.emptyRDD
}
}
-
-
-class SQLListenerMemoryLeakSuite extends SparkFunSuite {
-
- test("no memory leak") {
- quietly {
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
- .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
- val sc = new SparkContext(conf)
- try {
- SparkSession.sqlListener.set(null)
- val spark = new SparkSession(sc)
- import spark.implicits._
- // Run 100 successful executions and 100 failed executions.
- // Each execution only has one job and one stage.
- for (i <- 0 until 100) {
- val df = Seq(
- (1, 1),
- (2, 2)
- ).toDF()
- df.collect()
- try {
- df.foreach(_ => throw new RuntimeException("Oops"))
- } catch {
- case e: SparkException => // This is expected for a failed job
- }
- }
- sc.listenerBus.waitUntilEmpty(10000)
- assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
- assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
- // 50 for successful executions and 50 for failed executions
- assert(spark.sharedState.listener.executionIdToData.size <= 100)
- assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
- assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
- } finally {
- sc.stop()
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org