You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/11/13 21:50:25 UTC

spark git commit: [SPARK-22471][SQL] SQLListener consumes much memory causing OutOfMemoryError

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 af0b1855f -> d905e85d2


[SPARK-22471][SQL] SQLListener consumes much memory causing OutOfMemoryError

## What changes were proposed in this pull request?

This PR addresses the issue [SPARK-22471](https://issues.apache.org/jira/browse/SPARK-22471). The modified version of `SQLListener` respects the setting `spark.ui.retainedStages` and keeps the number of the tracked stages within the specified limit. The hash map `_stageIdToStageMetrics` does not outgrow the limit, hence overall memory consumption does not grow with time anymore.

A 2.2-compatible fix. Maybe incompatible with 2.3 due to #19681.

## How was this patch tested?

A new unit test covers this fix - see `SQLListenerMemorySuite.scala`.

Author: Arseniy Tashoyan <ta...@gmail.com>

Closes #19711 from tashoyan/SPARK-22471-branch-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d905e85d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d905e85d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d905e85d

Branch: refs/heads/branch-2.2
Commit: d905e85d2f2229fc26e8af8f74771de38a25c577
Parents: af0b185
Author: Arseniy Tashoyan <ta...@gmail.com>
Authored: Mon Nov 13 13:50:12 2017 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Nov 13 13:50:12 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/ui/SQLListener.scala    |  13 ++-
 .../execution/ui/SQLListenerMemorySuite.scala   | 106 +++++++++++++++++++
 .../sql/execution/ui/SQLListenerSuite.scala     |  47 +-------
 3 files changed, 119 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index b4a9123..e0c8cb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -101,6 +101,9 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
 
   private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
 
+  private val retainedStages = conf.getInt("spark.ui.retainedStages",
+    SparkUI.DEFAULT_RETAINED_STAGES)
+
   private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
 
   // Old data in the following fields must be removed in "trimExecutionsIfNecessary".
@@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
    */
   private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
 
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
+  private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, SQLStageMetrics]()
 
   private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
 
@@ -207,6 +210,14 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
     }
   }
 
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
+    val extraStages = _stageIdToStageMetrics.size - retainedStages
+    if (extraStages > 0) {
+      val toRemove = _stageIdToStageMetrics.take(extraStages).keys
+      _stageIdToStageMetrics --= toRemove
+    }
+  }
+
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
     if (taskEnd.taskMetrics != null) {
       updateTaskAccumulatorValues(

http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
new file mode 100644
index 0000000..24a09f3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerMemorySuite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.internal.config
+import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.functions._
+
+class SQLListenerMemorySuite extends SparkFunSuite {
+
+  test("SPARK-22471 - _stageIdToStageMetrics grows too large on long executions") {
+    quietly {
+      val conf = new SparkConf()
+        .setMaster("local[*]")
+        .setAppName("MemoryLeakTest")
+        /* Don't retry the tasks to run this test quickly */
+        .set(config.MAX_TASK_FAILURES, 1)
+        .set("spark.ui.retainedStages", "50")
+      withSpark(new SparkContext(conf)) { sc =>
+        SparkSession.sqlListener.set(null)
+        val spark = new SparkSession(sc)
+        import spark.implicits._
+
+        val sample = List(
+          (1, 10),
+          (2, 20),
+          (3, 30)
+        ).toDF("id", "value")
+
+        /* Some complex computation with many stages. */
+        val joins = 1 to 100
+        val summedCol: Column = joins
+          .map(j => col(s"value$j"))
+          .reduce(_ + _)
+        val res = joins
+          .map { j =>
+            sample.select($"id", $"value" * j as s"value$j")
+          }
+          .reduce(_.join(_, "id"))
+          .select($"id", summedCol as "value")
+          .groupBy("id")
+          .agg(sum($"value") as "value")
+          .orderBy("id")
+        res.collect()
+
+        sc.listenerBus.waitUntilEmpty(10000)
+        assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 50)
+      }
+    }
+  }
+
+  test("no memory leak") {
+    quietly {
+      val conf = new SparkConf()
+        .setMaster("local")
+        .setAppName("test")
+        .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
+        .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
+      withSpark(new SparkContext(conf)) { sc =>
+        SparkSession.sqlListener.set(null)
+        val spark = new SparkSession(sc)
+        import spark.implicits._
+        // Run 100 successful executions and 100 failed executions.
+        // Each execution only has one job and one stage.
+        for (i <- 0 until 100) {
+          val df = Seq(
+            (1, 1),
+            (2, 2)
+          ).toDF()
+          df.collect()
+          try {
+            df.foreach(_ => throw new RuntimeException("Oops"))
+          } catch {
+            case e: SparkException => // This is expected for a failed job
+          }
+        }
+        sc.listenerBus.waitUntilEmpty(10000)
+        assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
+        assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
+        // 50 for successful executions and 50 for failed executions
+        assert(spark.sharedState.listener.executionIdToData.size <= 100)
+        assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
+        assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d905e85d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index e6cd41e..23420a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -24,14 +24,12 @@ import org.mockito.Mockito.mock
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.internal.config
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.test.SharedSQLContext
@@ -485,46 +483,3 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
     sc.emptyRDD
   }
 }
-
-
-class SQLListenerMemoryLeakSuite extends SparkFunSuite {
-
-  test("no memory leak") {
-    quietly {
-      val conf = new SparkConf()
-        .setMaster("local")
-        .setAppName("test")
-        .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
-        .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
-      val sc = new SparkContext(conf)
-      try {
-        SparkSession.sqlListener.set(null)
-        val spark = new SparkSession(sc)
-        import spark.implicits._
-        // Run 100 successful executions and 100 failed executions.
-        // Each execution only has one job and one stage.
-        for (i <- 0 until 100) {
-          val df = Seq(
-            (1, 1),
-            (2, 2)
-          ).toDF()
-          df.collect()
-          try {
-            df.foreach(_ => throw new RuntimeException("Oops"))
-          } catch {
-            case e: SparkException => // This is expected for a failed job
-          }
-        }
-        sc.listenerBus.waitUntilEmpty(10000)
-        assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
-        assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
-        // 50 for successful executions and 50 for failed executions
-        assert(spark.sharedState.listener.executionIdToData.size <= 100)
-        assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
-        assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
-      } finally {
-        sc.stop()
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org