You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2020/02/10 21:17:53 UTC

[spark] branch branch-3.0 updated: [SPARK-30556][SQL][FOLLOWUP] Reset the status changed in SQLExecution.withThreadLocalCaptured

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3038a81  [SPARK-30556][SQL][FOLLOWUP] Reset the status changed in SQLExecution.withThreadLocalCaptured
3038a81 is described below

commit 3038a81ecdb526b01e80eeb34a7eacc6ac48d360
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Mon Feb 10 22:16:25 2020 +0100

    [SPARK-30556][SQL][FOLLOWUP] Reset the status changed in SQLExecution.withThreadLocalCaptured
    
    ### What changes were proposed in this pull request?
    Follow up for #27267, reset the status changed in SQLExecution.withThreadLocalCaptured.
    
    ### Why are the changes needed?
    For code safety.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UT.
    
    Closes #27516 from xuanyuanking/SPARK-30556-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: herman <he...@databricks.com>
    (cherry picked from commit a6b91d2bf727e175d0e175295001db85647539b1)
    Signed-off-by: herman <he...@databricks.com>
---
 .../scala/org/apache/spark/sql/execution/SQLExecution.scala  | 12 +++++++++++-
 .../apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala | 10 ++++++----
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 995d94e..9f17781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -177,9 +177,19 @@ object SQLExecution {
     val sc = sparkSession.sparkContext
     val localProps = Utils.cloneProperties(sc.getLocalProperties)
     Future {
+      val originalSession = SparkSession.getActiveSession
+      val originalLocalProps = sc.getLocalProperties
       SparkSession.setActiveSession(activeSession)
       sc.setLocalProperties(localProps)
-      body
+      val res = body
+      // reset active session and local props.
+      sc.setLocalProperties(originalLocalProps)
+      if (originalSession.nonEmpty) {
+        SparkSession.setActiveSession(originalSession.get)
+      } else {
+        SparkSession.clearActiveSession()
+      }
+      res
     }(exec)
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 0cc658c..46d0c64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.internal
 
+import java.util.UUID
+
 import org.scalatest.Assertions._
 
 import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
@@ -144,16 +146,16 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
         }
 
         // set local configuration and assert
-        val confValue1 = "e"
+        val confValue1 = UUID.randomUUID().toString()
         createDataframe(confKey, confValue1).createOrReplaceTempView("m")
         spark.sparkContext.setLocalProperty(confKey, confValue1)
-        assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect.size == 1)
+        assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM m)").collect().length == 1)
 
         // change the conf value and assert again
-        val confValue2 = "f"
+        val confValue2 = UUID.randomUUID().toString()
         createDataframe(confKey, confValue2).createOrReplaceTempView("n")
         spark.sparkContext.setLocalProperty(confKey, confValue2)
-        assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().size == 1)
+        assert(sql("SELECT * FROM l WHERE EXISTS (SELECT * FROM n)").collect().length == 1)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org