You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/27 13:47:30 UTC

[spark] branch branch-3.3 updated: [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 39413db6f01 [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing
39413db6f01 is described below

commit 39413db6f011b75504bb952423f3a5b579b36d97
Author: mcdull-zhang <wo...@163.com>
AuthorDate: Mon Jun 27 21:46:53 2022 +0800

    [SPARK-37753][FOLLOWUP][SQL] Fix unit tests sometimes failing
    
    ### What changes were proposed in this pull request?
    This unit test sometimes fails to run. for example, https://github.com/apache/spark/pull/35715#discussion_r892247619
    
    When the left side is completed first, and then the right side is completed, since it is known that there are many empty partitions on the left side, the broadcast on the right side is demoted.
    
    However, if the right side is completed first and the left side is still being executed, the right side does not know whether there are many empty partitions on the left side, so there is no demote, and then the right side is broadcast in the planning stage.
    
    This PR does this:
    When it is found that the other side is QueryStage, if the QueryStage has not been materialized, demote it first. When the other side is completed, judge again whether demote is needed.
    
    ### Why are the changes needed?
    Fix small problems in logic
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    manual testing
    
    Closes #36966 from mcdull-zhang/wait_other_side.
    
    Authored-by: mcdull-zhang <wo...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 8c8801cf501ddbdeb4a4a869bc27c8a2331531fe)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../adaptive/AdaptiveQueryExecSuite.scala          | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 67dc359ab8f..dd727855ce2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -22,6 +22,7 @@ import java.net.URI
 
 import org.apache.logging.log4j.Level
 import org.scalatest.PrivateMethodTester
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
@@ -710,18 +711,20 @@ class AdaptiveQueryExecSuite
 
   test("SPARK-37753: Inhibit broadcast in left outer join when there are many empty" +
     " partitions on outer/left side") {
-    withSQLConf(
-      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
-      // `testData` is small enough to be broadcast but has empty partition ratio over the config.
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
-        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
-          "SELECT * FROM (select * from testData where value = '1') td" +
-            " left outer join testData2 ON key = a")
-        val smj = findTopLevelSortMergeJoin(plan)
-        assert(smj.size == 1)
-        val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
-        assert(bhj.isEmpty)
+    eventually(timeout(15.seconds), interval(500.milliseconds)) {
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
+        // `testData` is small enough to be broadcast but has empty partition ratio over the config.
+        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
+          val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+            "SELECT * FROM (select * from testData where value = '1') td" +
+              " left outer join testData2 ON key = a")
+          val smj = findTopLevelSortMergeJoin(plan)
+          assert(smj.size == 1)
+          val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+          assert(bhj.isEmpty)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org