You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/04/13 05:31:03 UTC
spark git commit: [SPARK-23971] Should not leak Spark sessions across
test suites
Repository: spark
Updated Branches:
refs/heads/master ab7b961a4 -> 1018be44d
[SPARK-23971] Should not leak Spark sessions across test suites
## What changes were proposed in this pull request?
Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests.
## How was this patch tested?
Existing tests
Author: Eric Liang <ek...@databricks.com>
Closes #21058 from ericl/clear-session.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1018be44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1018be44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1018be44
Branch: refs/heads/master
Commit: 1018be44d6c52cf18e14d84160850063f0e60a1d
Parents: ab7b961
Author: Eric Liang <ek...@databricks.com>
Authored: Thu Apr 12 22:30:59 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Apr 12 22:30:59 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/SharedSparkSession.java | 9 ++++++--
.../org/apache/spark/sql/SparkSession.scala | 23 ++++++++++++++++++--
.../apache/spark/sql/SessionStateSuite.scala | 2 ++
.../spark/sql/test/SharedSparkSession.scala | 22 ++++++++++++++-----
4 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
index 4377987..35a2509 100644
--- a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
+++ b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
@@ -42,7 +42,12 @@ public abstract class SharedSparkSession implements Serializable {
@After
public void tearDown() {
- spark.stop();
- spark = null;
+ try {
+ spark.stop();
+ spark = null;
+ } finally {
+ SparkSession.clearDefaultSession();
+ SparkSession.clearActiveSession();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index b107492..c502e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallSite, Utils}
/**
@@ -81,6 +81,9 @@ class SparkSession private(
@transient private[sql] val extensions: SparkSessionExtensions)
extends Serializable with Closeable with Logging { self =>
+ // The call site where this SparkSession was constructed.
+ private val creationSite: CallSite = Utils.getCallSite()
+
private[sql] def this(sc: SparkContext) {
this(sc, None, None, new SparkSessionExtensions)
}
@@ -763,7 +766,7 @@ class SparkSession private(
@InterfaceStability.Stable
-object SparkSession {
+object SparkSession extends Logging {
/**
* Builder for [[SparkSession]].
@@ -1090,4 +1093,20 @@ object SparkSession {
}
}
+ private[spark] def cleanupAnyExistingSession(): Unit = {
+ val session = getActiveSession.orElse(getDefaultSession)
+ if (session.isDefined) {
+ logWarning(
+ s"""An existing Spark session exists as the active or default session.
+ |This probably means another suite leaked it. Attempting to stop it before continuing.
+ |This existing Spark session was created at:
+ |
+ |${session.get.creationSite.longForm}
+ |
+ """.stripMargin)
+ session.get.stop()
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
index 4efae4c..7d13660 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
@@ -44,6 +44,8 @@ class SessionStateSuite extends SparkFunSuite {
if (activeSession != null) {
activeSession.stop()
activeSession = null
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
}
super.afterAll()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index e758c86..8968dbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -60,6 +60,7 @@ trait SharedSparkSession
protected implicit def sqlContext: SQLContext = _spark.sqlContext
protected def createSparkSession: TestSparkSession = {
+ SparkSession.cleanupAnyExistingSession()
new TestSparkSession(sparkConf)
}
@@ -92,11 +93,22 @@ trait SharedSparkSession
* Stop the underlying [[org.apache.spark.SparkContext]], if any.
*/
protected override def afterAll(): Unit = {
- super.afterAll()
- if (_spark != null) {
- _spark.sessionState.catalog.reset()
- _spark.stop()
- _spark = null
+ try {
+ super.afterAll()
+ } finally {
+ try {
+ if (_spark != null) {
+ try {
+ _spark.sessionState.catalog.reset()
+ } finally {
+ _spark.stop()
+ _spark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org