You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/23 21:41:50 UTC

[spark] branch master updated: [SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 95e45c6  [SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase
95e45c6 is described below

commit 95e45c6257a614754e132f92b7b7239573d42b7a
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Feb 23 13:41:24 2021 -0800

    [SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase
    
    ### What changes were proposed in this pull request?
    
    A few minor improvements for `DynamicPartitionPruningSuiteBase`.
    
    ### Why are the changes needed?
    
    code cleanup
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #31625 from cloud-fan/followup.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/DynamicPartitionPruningSuite.scala   | 27 ++++++++--------------
 1 file changed, 10 insertions(+), 17 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cd7c441..bc9c300 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
 import org.apache.spark.sql.catalyst.plans.ExistenceJoin
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, BroadcastQueryStageExec, DisableAdaptiveExecution, EliminateJoinToEmptyRelation}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
 import org.apache.spark.sql.functions._
@@ -44,14 +44,9 @@ abstract class DynamicPartitionPruningSuiteBase
 
   import testImplicits._
 
-  val adaptiveExecutionOn: Boolean
-
   override def beforeAll(): Unit = {
     super.beforeAll()
 
-    spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, adaptiveExecutionOn)
-    spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY, true)
-
     val factData = Seq[(Int, Int, Int, Int)](
       (1000, 1, 1, 10),
       (1010, 2, 1, 10),
@@ -195,8 +190,8 @@ abstract class DynamicPartitionPruningSuiteBase
     subqueryBroadcast.foreach { s =>
       s.child match {
         case _: ReusedExchangeExec => // reuse check ok.
-        case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) =>
-        case b: BroadcastExchangeExec =>
+        case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok.
+        case b: BroadcastExchangeLike =>
           val hasReuse = plan.find {
             case ReusedExchangeExec(_, e) => e eq b
             case _ => false
@@ -337,7 +332,7 @@ abstract class DynamicPartitionPruningSuiteBase
 
         def getFactScan(plan: SparkPlan): SparkPlan = {
           val scanOption =
-            plan.find {
+            find(plan) {
               case s: FileSourceScanExec =>
                 s.output.exists(_.find(_.argString(maxFields = 100).contains("fid")).isDefined)
               case _ => false
@@ -1261,7 +1256,7 @@ abstract class DynamicPartitionPruningSuiteBase
       val countSubqueryBroadcasts =
         collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 }).sum
 
-      if (adaptiveExecutionOn) {
+      if (conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) {
         val countReusedSubqueryBroadcasts =
           collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1}).sum
 
@@ -1390,10 +1385,8 @@ abstract class DynamicPartitionPruningSuiteBase
   }
 }
 
-class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
-  override val adaptiveExecutionOn: Boolean = false
-}
+class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
+  with DisableAdaptiveExecutionSuite
 
-class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase {
-  override val adaptiveExecutionOn: Boolean = true
-}
+class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase
+  with EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org