You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/04/13 05:31:03 UTC

spark git commit: [SPARK-23971] Should not leak Spark sessions across test suites

Repository: spark
Updated Branches:
  refs/heads/master ab7b961a4 -> 1018be44d


[SPARK-23971] Should not leak Spark sessions across test suites

## What changes were proposed in this pull request?

Many suites currently leak Spark sessions (sometimes with stopped SparkContexts) via the thread-local active Spark session and default Spark session. We should attempt to clean these up and detect when this happens to improve the reproducibility of tests.

## How was this patch tested?

Existing tests

Author: Eric Liang <ek...@databricks.com>

Closes #21058 from ericl/clear-session.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1018be44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1018be44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1018be44

Branch: refs/heads/master
Commit: 1018be44d6c52cf18e14d84160850063f0e60a1d
Parents: ab7b961
Author: Eric Liang <ek...@databricks.com>
Authored: Thu Apr 12 22:30:59 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Apr 12 22:30:59 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SharedSparkSession.java    |  9 ++++++--
 .../org/apache/spark/sql/SparkSession.scala     | 23 ++++++++++++++++++--
 .../apache/spark/sql/SessionStateSuite.scala    |  2 ++
 .../spark/sql/test/SharedSparkSession.scala     | 22 ++++++++++++++-----
 4 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
index 4377987..35a2509 100644
--- a/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
+++ b/mllib/src/test/java/org/apache/spark/SharedSparkSession.java
@@ -42,7 +42,12 @@ public abstract class SharedSparkSession implements Serializable {
 
   @After
   public void tearDown() {
-    spark.stop();
-    spark = null;
+    try {
+      spark.stop();
+      spark = null;
+    } finally {
+      SparkSession.clearDefaultSession();
+      SparkSession.clearActiveSession();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index b107492..c502e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CallSite, Utils}
 
 
 /**
@@ -81,6 +81,9 @@ class SparkSession private(
     @transient private[sql] val extensions: SparkSessionExtensions)
   extends Serializable with Closeable with Logging { self =>
 
+  // The call site where this SparkSession was constructed.
+  private val creationSite: CallSite = Utils.getCallSite()
+
   private[sql] def this(sc: SparkContext) {
     this(sc, None, None, new SparkSessionExtensions)
   }
@@ -763,7 +766,7 @@ class SparkSession private(
 
 
 @InterfaceStability.Stable
-object SparkSession {
+object SparkSession extends Logging {
 
   /**
    * Builder for [[SparkSession]].
@@ -1090,4 +1093,20 @@ object SparkSession {
     }
   }
 
+  private[spark] def cleanupAnyExistingSession(): Unit = {
+    val session = getActiveSession.orElse(getDefaultSession)
+    if (session.isDefined) {
+      logWarning(
+        s"""An existing Spark session exists as the active or default session.
+           |This probably means another suite leaked it. Attempting to stop it before continuing.
+           |This existing Spark session was created at:
+           |
+           |${session.get.creationSite.longForm}
+           |
+         """.stripMargin)
+      session.get.stop()
+      SparkSession.clearActiveSession()
+      SparkSession.clearDefaultSession()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
index 4efae4c..7d13660 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
@@ -44,6 +44,8 @@ class SessionStateSuite extends SparkFunSuite {
     if (activeSession != null) {
       activeSession.stop()
       activeSession = null
+      SparkSession.clearActiveSession()
+      SparkSession.clearDefaultSession()
     }
     super.afterAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1018be44/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index e758c86..8968dbf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -60,6 +60,7 @@ trait SharedSparkSession
   protected implicit def sqlContext: SQLContext = _spark.sqlContext
 
   protected def createSparkSession: TestSparkSession = {
+    SparkSession.cleanupAnyExistingSession()
     new TestSparkSession(sparkConf)
   }
 
@@ -92,11 +93,22 @@ trait SharedSparkSession
    * Stop the underlying [[org.apache.spark.SparkContext]], if any.
    */
   protected override def afterAll(): Unit = {
-    super.afterAll()
-    if (_spark != null) {
-      _spark.sessionState.catalog.reset()
-      _spark.stop()
-      _spark = null
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org