You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/21 16:09:40 UTC

[spark] branch master updated: [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dae7988  [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener
dae7988 is described below

commit dae79888dc6476892877d3b3b233381cdbf7fa74
Author: Vinoo Ganesh <vi...@gmail.com>
AuthorDate: Thu May 21 16:06:28 2020 +0000

    [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener
    
    ## What changes were proposed in this pull request?
    
    This change was made as a result of the conversation on https://issues.apache.org/jira/browse/SPARK-31354 and is intended to continue work from that ticket here.
    
    This change fixes a memory leak where SparkSession listeners are never cleared off of the SparkContext listener bus.
    
    Before running this PR, the following code:
    ```
    SparkSession.builder().master("local").getOrCreate()
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()
    
    SparkSession.builder().master("local").getOrCreate()
    SparkSession.clearActiveSession()
    SparkSession.clearDefaultSession()
    ```
    would result in a SparkContext with the following listeners on the listener bus:
    ```
    [org.apache.spark.status.AppStatusListener5f610071,
    org.apache.spark.HeartbeatReceiverd400c17,
    org.apache.spark.sql.SparkSession$$anon$125849aeb, <-First instance
    org.apache.spark.sql.SparkSession$$anon$1fadb9a0] <- Second instance
    ```
    After this PR, the execution of the same code above results in SparkContext with the following listeners on the listener bus:
    ```
    [org.apache.spark.status.AppStatusListener5f610071,
    org.apache.spark.HeartbeatReceiverd400c17,
    org.apache.spark.sql.SparkSession$$anon$125849aeb] <-One instance
    ```
    ## How was this patch tested?
    
    * Unit test included as a part of the PR
    
    Closes #28128 from vinooganesh/vinooganesh/SPARK-27958.
    
    Lead-authored-by: Vinoo Ganesh <vi...@gmail.com>
    Co-authored-by: Vinoo Ganesh <vg...@palantir.com>
    Co-authored-by: Vinoo Ganesh <vi...@safegraph.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/sql/SparkSession.scala  | 27 +++++++++++++---------
 .../spark/sql/SparkSessionBuilderSuite.scala       | 25 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index be597ed..60a6037 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import java.io.Closeable
 import java.util.concurrent.TimeUnit._
-import java.util.concurrent.atomic.AtomicReference
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.TypeTag
@@ -49,7 +49,6 @@ import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
 import org.apache.spark.util.{CallSite, Utils}
 
-
 /**
  * The entry point to programming Spark with the Dataset and DataFrame API.
  *
@@ -940,15 +939,7 @@ object SparkSession extends Logging {
         options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
         setDefaultSession(session)
         setActiveSession(session)
-
-        // Register a successfully instantiated context to the singleton. This should be at the
-        // end of the class definition so that the singleton is updated only if there is no
-        // exception in the construction of the instance.
-        sparkContext.addSparkListener(new SparkListener {
-          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
-            defaultSession.set(null)
-          }
-        })
+        registerContextListener(sparkContext)
       }
 
       return session
@@ -1064,6 +1055,20 @@ object SparkSession extends Logging {
   // Private methods from now on
   ////////////////////////////////////////////////////////////////////////////////////////
 
+  private val listenerRegistered: AtomicBoolean = new AtomicBoolean(false)
+
+  /** Register the AppEnd listener onto the Context  */
+  private def registerContextListener(sparkContext: SparkContext): Unit = {
+    if (!listenerRegistered.get()) {
+      sparkContext.addSparkListener(new SparkListener {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+          defaultSession.set(null)
+        }
+      })
+      listenerRegistered.set(true)
+    }
+  }
+
   /** The active SparkSession for the current thread. */
   private val activeThreadSession = new InheritableThreadLocal[SparkSession]
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 7b76d07..0a522fd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -169,6 +169,31 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
     assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
   }
 
+  test("SPARK-31354: SparkContext only register one SparkSession ApplicationEnd listener") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test-app-SPARK-31354-1")
+    val context = new SparkContext(conf)
+    SparkSession
+      .builder()
+      .sparkContext(context)
+      .master("local")
+      .getOrCreate()
+    val postFirstCreation = context.listenerBus.listeners.size()
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
+
+    SparkSession
+      .builder()
+      .sparkContext(context)
+      .master("local")
+      .getOrCreate()
+    val postSecondCreation = context.listenerBus.listeners.size()
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
+    assert(postFirstCreation == postSecondCreation)
+  }
+
   test("SPARK-31532: should not propagate static sql configs to the existing" +
     " active/default SparkSession") {
     val session = SparkSession.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org