You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/13 12:58:57 UTC

[spark] branch master updated: [SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b21ba  [SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled
f1b21ba is described below

commit f1b21ba50538bb2db9f958136bd15b43dc40ad5b
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Wed Jan 13 12:58:27 2021 +0000

    [SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled
    
    ### What changes were proposed in this pull request?
    #24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled.
    
    ### Why are the changes needed?
    When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually test.
    Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually:
    1. Start a Spark thrift-server with less resource in YARN.
    2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline.
    3. Cancel the SQL in beeline
    
    Without the patch, broadcast sub-jobs won't be cancelled.
    ![Screen Shot 2021-01-11 at 12 03 13 PM](https://user-images.githubusercontent.com/1853780/104150975-ab024b00-5416-11eb-8bf9-b5167bdad80a.png)
    
    With this patch, broadcast sub-jobs will be cancelled.
    ![Screen Shot 2021-01-11 at 11 43 40 AM](https://user-images.githubusercontent.com/1853780/104150994-be151b00-5416-11eb-80ff-313d423c8a2e.png)
    
    Closes #31119 from LantaoJin/SPARK-34064.
    
    Authored-by: LantaoJin <ji...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/exchange/BroadcastExchangeExec.scala       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 0c5fee2..c322d5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise}
 import scala.concurrent.duration.NANOSECONDS
 import scala.util.control.NonFatal
 
-import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.{broadcast, SparkContext, SparkException}
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
     child: SparkPlan) extends BroadcastExchangeLike {
   import BroadcastExchangeExec._
 
-  override val runId: UUID = UUID.randomUUID
+  // Cancelling a SQL statement from Spark ThriftServer needs to cancel
+  // its related broadcast sub-jobs. So set the run id to job group id if exists.
+  override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+      .map(UUID.fromString).getOrElse(UUID.randomUUID)
 
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org