You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/04/06 22:14:51 UTC

[spark] branch master updated: [SPARK-26992][STS] Fix STS scheduler pool correct delivery

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6450c59  [SPARK-26992][STS] Fix STS scheduler pool correct delivery
6450c59 is described below

commit 6450c5948adb58cbf3afaaf249560c81a4164cf6
Author: cxzl25 <cx...@users.noreply.github.com>
AuthorDate: Sat Apr 6 17:14:29 2019 -0500

    [SPARK-26992][STS] Fix STS scheduler pool correct delivery
    
    ## What changes were proposed in this pull request?
    
    The user sets the value of spark.sql.thriftserver.scheduler.pool.
    Spark thrift server saves this value in the LocalProperty of threadlocal type, but does not clean up after running, causing other sessions to run in the previously set pool name.
    
    ## How was this patch tested?
    
    manual tests
    
    Closes #23895 from cxzl25/thrift_server_scheduler_pool_pollute.
    
    Lead-authored-by: cxzl25 <cx...@users.noreply.github.com>
    Co-authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../SparkExecuteStatementOperation.scala           | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index b05307e..3862d6c 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -109,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
+  def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool {
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
     setHasResultSet(true)
@@ -210,7 +210,7 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  private def execute(): Unit = {
+  private def execute(): Unit = withSchedulerPool {
     statementId = UUID.randomUUID().toString
     logInfo(s"Running query '$statement' with $statementId")
     setState(OperationState.RUNNING)
@@ -225,10 +225,6 @@ private[hive] class SparkExecuteStatementOperation(
       statementId,
       parentSession.getUsername)
     sqlContext.sparkContext.setJobGroup(statementId, statement)
-    val pool = sessionToActivePool.get(parentSession.getSessionHandle)
-    // It may have unpredictably behavior since we use thread pools to execute quries and
-    // the 'spark.scheduler.pool' may not be 'default' when we did not set its value.(SPARK-26914)
-    sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
     try {
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())
@@ -291,6 +287,20 @@ private[hive] class SparkExecuteStatementOperation(
       sqlContext.sparkContext.cancelJobGroup(statementId)
     }
   }
+
+  private def withSchedulerPool[T](body: => T): T = {
+    val pool = sessionToActivePool.get(parentSession.getSessionHandle)
+    if (pool != null) {
+      sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool)
+    }
+    try {
+      body
+    } finally {
+      if (pool != null) {
+        sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null)
+      }
+    }
+  }
 }
 
 object SparkExecuteStatementOperation {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org