You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/08 11:33:41 UTC

[GitHub] [spark] MaxGekk commented on a diff in pull request #35715: [SPARK-37753][SQL] Fine tune logic to demote Broadcast hash join in DynamicJoinSelection

MaxGekk commented on code in PR #35715:
URL: https://github.com/apache/spark/pull/35715#discussion_r892247619


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala:
##########
@@ -683,6 +683,41 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+  test("SPARK-37753: Allow changing outer join to broadcast join even if too many empty" +
+    " partitions on broadcast side") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
+      // `testData` is small enough to be broadcast but has empty partition ratio over the config.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+          "SELECT * FROM (select * from testData where value = '1') td" +
+            " right outer join testData2 ON key = a")
+        val smj = findTopLevelSortMergeJoin(plan)
+        assert(smj.size == 1)
+        val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+        assert(bhj.size == 1)
+      }
+    }
+  }
+
+  test("SPARK-37753: Inhibit broadcast in left outer join when there are many empty" +

Review Comment:
   The test is flaky. It fails sometimes, see https://github.com/apache/spark/runs/6788261561?check_suite_focus=true:
   ```
   [info] - SPARK-37753: Inhibit broadcast in left outer join when there are many empty partitions on outer/left side *** FAILED *** (230 milliseconds)
   [info]   ArrayBuffer(BroadcastHashJoin [key#116713], [a#116723], LeftOuter, BuildRight, false
   [info]   :- AQEShuffleRead local
   [info]   :  +- ShuffleQueryStage 0
   [info]   :     +- Exchange hashpartitioning(key#116713, 5), ENSURE_REQUIREMENTS, [id=#264946]
   [info]   :        +- *(1) Filter (isnotnull(value#116714) AND (value#116714 = 1))
   [info]   :           +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#116713, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false, true) AS value#116714]
   [info]   :              +- Scan[obj#116712]
   [info]   +- BroadcastQueryStage 2
   [info]      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#264986]
   [info]         +- AQEShuffleRead local
   [info]            +- ShuffleQueryStage 1
   [info]               +- Exchange hashpartitioning(a#116723, 5), ENSURE_REQUIREMENTS, [id=#264965]
   [info]                  +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#116723, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#116724]
   [info]                     +- Scan[obj#116722]
   [info]   ) was not empty (AdaptiveQueryExecSuite.scala:718)
   [info]   org.scalatest.exceptions.TestFailedException:
   [info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
   [info]   at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
   [info]   at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
   [info]   at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
   [info]   at org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite.$anonfun$new$59(AdaptiveQueryExecSuite.scala:718)
   [info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
   [info]   at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org