You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/23 21:41:50 UTC
[spark] branch master updated: [SPARK-34168][SQL][FOLLOWUP] Improve
DynamicPartitionPruningSuiteBase
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 95e45c6 [SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase
95e45c6 is described below
commit 95e45c6257a614754e132f92b7b7239573d42b7a
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Feb 23 13:41:24 2021 -0800
[SPARK-34168][SQL][FOLLOWUP] Improve DynamicPartitionPruningSuiteBase
### What changes were proposed in this pull request?
A few minor improvements for `DynamicPartitionPruningSuiteBase`.
### Why are the changes needed?
code cleanup
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #31625 from cloud-fan/followup.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/sql/DynamicPartitionPruningSuite.scala | 27 ++++++++--------------
1 file changed, 10 insertions(+), 17 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index cd7c441..bc9c300 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expr
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, BroadcastQueryStageExec, DisableAdaptiveExecution, EliminateJoinToEmptyRelation}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.functions._
@@ -44,14 +44,9 @@ abstract class DynamicPartitionPruningSuiteBase
import testImplicits._
- val adaptiveExecutionOn: Boolean
-
override def beforeAll(): Unit = {
super.beforeAll()
- spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED, adaptiveExecutionOn)
- spark.sessionState.conf.setConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY, true)
-
val factData = Seq[(Int, Int, Int, Int)](
(1000, 1, 1, 10),
(1010, 2, 1, 10),
@@ -195,8 +190,8 @@ abstract class DynamicPartitionPruningSuiteBase
subqueryBroadcast.foreach { s =>
s.child match {
case _: ReusedExchangeExec => // reuse check ok.
- case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) =>
- case b: BroadcastExchangeExec =>
+ case BroadcastQueryStageExec(_, _: ReusedExchangeExec, _) => // reuse check ok.
+ case b: BroadcastExchangeLike =>
val hasReuse = plan.find {
case ReusedExchangeExec(_, e) => e eq b
case _ => false
@@ -337,7 +332,7 @@ abstract class DynamicPartitionPruningSuiteBase
def getFactScan(plan: SparkPlan): SparkPlan = {
val scanOption =
- plan.find {
+ find(plan) {
case s: FileSourceScanExec =>
s.output.exists(_.find(_.argString(maxFields = 100).contains("fid")).isDefined)
case _ => false
@@ -1261,7 +1256,7 @@ abstract class DynamicPartitionPruningSuiteBase
val countSubqueryBroadcasts =
collectWithSubqueries(plan)({ case _: SubqueryBroadcastExec => 1 }).sum
- if (adaptiveExecutionOn) {
+ if (conf.getConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)) {
val countReusedSubqueryBroadcasts =
collectWithSubqueries(plan)({ case ReusedSubqueryExec(_: SubqueryBroadcastExec) => 1}).sum
@@ -1390,10 +1385,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
-class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase {
- override val adaptiveExecutionOn: Boolean = false
-}
+class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
+ with DisableAdaptiveExecutionSuite
-class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase {
- override val adaptiveExecutionOn: Boolean = true
-}
+class DynamicPartitionPruningSuiteAEOn extends DynamicPartitionPruningSuiteBase
+ with EnableAdaptiveExecutionSuite
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org