You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/27 13:47:20 UTC

[spark] branch master updated: [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8801cf501 [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing
8c8801cf501 is described below

commit 8c8801cf501ddbdeb4a4a869bc27c8a2331531fe
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Mon Jun 27 21:46:53 2022 +0800

    [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing
    
    ### What changes were proposed in this pull request?
    This unit test sometimes fails to run. for example, https://github.com/apache/spark/pull/35715#discussion_r892247619
    
    When the left side is completed first, and then the right side is completed, since it is known that there are many empty partitions on the left side, the broadcast on the right side is demoted.
    
    However, if the right side is completed first and the left side is still being executed, the right side does not know whether there are many empty partitions on the left side, so there is no demote, and then the right side is broadcast in the planning stage.
    
    This PR does this:
    When it is found that the other side is QueryStage, if the QueryStage has not been materialized, demote it first. When the other side is completed, judge again whether demote is needed.
    
    ### Why are the changes needed?
    Fix small problems in logic
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    manual testing
    
    Closes #36966 from mcdull-zhang/wait_other_side.
    
    Authored-by: mcdull-zhang <wo...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../adaptive/AdaptiveQueryExecSuite.scala          | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 7b7afe40590..a6aa0da2692 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -22,6 +22,7 @@ import java.net.URI
 
 import org.apache.logging.log4j.Level
 import org.scalatest.PrivateMethodTester
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
@@ -710,18 +711,20 @@ class AdaptiveQueryExecSuite
 
   test("SPARK-37753: Inhibit broadcast in left outer join when there are many empty" +
     " partitions on outer/left side") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
-      // `testData` is small enough to be broadcast but has empty partition ratio over the config.
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
-        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM (select * from testData where value = '1') td" +
-            " left outer join testData2 ON key = a")
-        val smj = findTopLevelSortMergeJoin(plan)
-        assert(smj.size == 1)
-        val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
-        assert(bhj.isEmpty)
+    eventually(timeout(15.seconds), interval(500.milliseconds)) {
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
+        // `testData` is small enough to be broadcast but has empty partition ratio over the config.
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+            "SELECT * FROM (select * from testData where value = '1') td" +
+              " left outer join testData2 ON key = a")
+          val smj = findTopLevelSortMergeJoin(plan)
+          assert(smj.size == 1)
+          val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+          assert(bhj.isEmpty)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org