You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/05 13:17:01 UTC

[spark] branch branch-3.2 updated: [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1785ead  [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE
1785ead is described below

commit 1785ead733d3c8bd6511f2dff231b0c5de53e270
Author: Kent Yao <ya...@apache.org>
AuthorDate: Thu Aug 5 21:15:35 2021 +0800

    [SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE
    
    ### What changes were proposed in this pull request?
    
    This reverts SPARK-31475, as there are always more concurrent jobs running in AQE mode, especially when running multiple queries at the same time. Currently, the broadcast timeout does not record accurately for the BroadcastQueryStageExec only, but also including the time waiting for being scheduled. If all the resources are currently being occupied for materializing other stages, it timeouts without a chance to run actually.
    
     
    
    ![image](https://user-images.githubusercontent.com/8326978/128169612-4c96c8f6-6f8e-48ed-8eaf-450f87982c3b.png)
    
     
    
    The default value is 300s, and it's hard to adjust the timeout for AQE mode. Usually, you need an extremely large number for real-world cases. As you can see in the example, above, the timeout we used for it was 1800s, and obviously, it needed 3x more or something
    
     
    
    ### Why are the changes needed?
    
    AQE is default now, we can make it more stable with this PR
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, broadcast timeout now is not used for AQE
    
    ### How was this patch tested?
    
    modified test
    
    Closes #33636 from yaooqinn/SPARK-36414.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 0c94e47aecab0a8c346e1a004686d1496a9f2b07)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/adaptive/QueryStageExec.scala    | 26 ++--------------------
 .../sql/execution/joins/BroadcastJoinSuite.scala   | 10 +++++----
 2 files changed, 8 insertions(+), 28 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index f308829..e2f763e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -17,10 +17,9 @@
 
 package org.apache.spark.sql.execution.adaptive
 
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
-import scala.concurrent.{Future, Promise}
+import scala.concurrent.Future
 
 import org.apache.spark.{FutureAction, MapOutputStatistics}
 import org.apache.spark.broadcast.Broadcast
@@ -29,11 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.ThreadUtils
 
 /**
  * A query stage is an independent subgraph of the query plan. Query stage materializes its output
@@ -221,22 +218,8 @@ case class BroadcastQueryStageExec(
       throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}")
   }
 
-  @transient private lazy val materializeWithTimeout = {
-    val broadcastFuture = broadcast.submitBroadcastJob
-    val timeout = conf.broadcastTimeout
-    val promise = Promise[Any]()
-    val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new Runnable() {
-      override def run(): Unit = {
-        promise.tryFailure(QueryExecutionErrors.executeBroadcastTimeoutError(timeout, None))
-      }
-    }, timeout, TimeUnit.SECONDS)
-    broadcastFuture.onComplete(_ => fail.cancel(false))(AdaptiveSparkPlanExec.executionContext)
-    Future.firstCompletedOf(
-      Seq(broadcastFuture, promise.future))(AdaptiveSparkPlanExec.executionContext)
-  }
-
   override def doMaterialize(): Future[Any] = {
-    materializeWithTimeout
+    broadcast.submitBroadcastJob
   }
 
   override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = {
@@ -257,8 +240,3 @@ case class BroadcastQueryStageExec(
 
   override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics
 }
-
-object BroadcastQueryStageExec {
-  private val scheduledExecutor =
-    ThreadUtils.newDaemonSingleThreadScheduledExecutor("BroadcastStageTimeout")
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 83163cf..dd6a412 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -415,15 +415,17 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
 
   test("Broadcast timeout") {
     val timeout = 5
-    val slowUDF = udf({ x: Int => Thread.sleep(timeout * 10 * 1000); x })
+    val slowUDF = udf({ x: Int => Thread.sleep(timeout * 1000); x })
     val df1 = spark.range(10).select($"id" as 'a)
     val df2 = spark.range(5).select(slowUDF($"id") as 'a)
     val testDf = df1.join(broadcast(df2), "a")
     withSQLConf(SQLConf.BROADCAST_TIMEOUT.key -> timeout.toString) {
-      val e = intercept[Exception] {
-        testDf.collect()
+      if (!conf.adaptiveExecutionEnabled) {
+        val e = intercept[Exception] {
+          testDf.collect()
+        }
+        assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs."))
       }
-      assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs."))
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org