You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/13 12:58:57 UTC
[spark] branch master updated: [SPARK-34064][SQL] Cancel the
running broadcast sub-jobs when SQL statement is cancelled
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f1b21ba [SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled
f1b21ba is described below
commit f1b21ba50538bb2db9f958136bd15b43dc40ad5b
Author: LantaoJin <ji...@gmail.com>
AuthorDate: Wed Jan 13 12:58:27 2021 +0000
[SPARK-34064][SQL] Cancel the running broadcast sub-jobs when SQL statement is cancelled
### What changes were proposed in this pull request?
#24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled.
### Why are the changes needed?
When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually test.
Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually:
1. Start a Spark thrift-server with less resource in YARN.
2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline.
3. Cancel the SQL in beeline
Without the patch, broadcast sub-jobs won't be cancelled.
![Screen Shot 2021-01-11 at 12 03 13 PM](https://user-images.githubusercontent.com/1853780/104150975-ab024b00-5416-11eb-8bf9-b5167bdad80a.png)
With this patch, broadcast sub-jobs will be cancelled.
![Screen Shot 2021-01-11 at 11 43 40 AM](https://user-images.githubusercontent.com/1853780/104150994-be151b00-5416-11eb-80ff-313d423c8a2e.png)
Closes #31119 from LantaoJin/SPARK-34064.
Authored-by: LantaoJin <ji...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 0c5fee2..c322d5e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -24,7 +24,7 @@ import scala.concurrent.{ExecutionContext, Promise}
import scala.concurrent.duration.NANOSECONDS
import scala.util.control.NonFatal
-import org.apache.spark.{broadcast, SparkException}
+import org.apache.spark.{broadcast, SparkContext, SparkException}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -74,7 +74,10 @@ case class BroadcastExchangeExec(
child: SparkPlan) extends BroadcastExchangeLike {
import BroadcastExchangeExec._
- override val runId: UUID = UUID.randomUUID
+ // Cancelling a SQL statement from Spark ThriftServer needs to cancel
+ // its related broadcast sub-jobs. So set the run id to job group id if exists.
+ override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ .map(UUID.fromString).getOrElse(UUID.randomUUID)
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org