You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/02/05 20:38:29 UTC
[spark] branch branch-3.0 updated: [SPARK-30721][SQL][TESTS] Fix
DataFrameAggregateSuite when enabling AQE
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d06a9df [SPARK-30721][SQL][TESTS] Fix DataFrameAggregateSuite when enabling AQE
d06a9df is described below
commit d06a9dfd9098b704c75047161590bb4a32b25286
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Feb 5 12:36:51 2020 -0800
[SPARK-30721][SQL][TESTS] Fix DataFrameAggregateSuite when enabling AQE
### What changes were proposed in this pull request?
update `DataFrameAggregateSuite` to make it pass with AQE
### Why are the changes needed?
We don't need to turn off AQE in `DataFrameAggregateSuite`
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
run `DataFrameAggregateSuite` locally with AQE on.
Closes #27451 from cloud-fan/aqe-test.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 3b26f807a0eb0e59c5123c3f1e2262b712800c0f)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/sql/DataFrameAggregateSuite.scala | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index dc1767a..d7df75f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -615,34 +615,33 @@ class DataFrameAggregateSuite extends QueryTest
Seq((true, true), (true, false), (false, true), (false, false))) {
withSQLConf(
(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
- (SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString),
- (SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false")) {
- // When enable AQE, the WholeStageCodegenExec is added during QueryStageExec.
+ (SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {
val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")
// test case for HashAggregate
val hashAggDF = df.groupBy("x").agg(c, sum("y"))
+ hashAggDF.collect()
val hashAggPlan = hashAggDF.queryExecution.executedPlan
if (wholeStage) {
- assert(hashAggPlan.find {
+ assert(find(hashAggPlan) {
case WholeStageCodegenExec(_: HashAggregateExec) => true
case _ => false
}.isDefined)
} else {
- assert(hashAggPlan.isInstanceOf[HashAggregateExec])
+ assert(stripAQEPlan(hashAggPlan).isInstanceOf[HashAggregateExec])
}
- hashAggDF.collect()
// test case for ObjectHashAggregate and SortAggregate
val objHashAggOrSortAggDF = df.groupBy("x").agg(c, collect_list("y"))
- val objHashAggOrSortAggPlan = objHashAggOrSortAggDF.queryExecution.executedPlan
+ objHashAggOrSortAggDF.collect()
+ val objHashAggOrSortAggPlan =
+ stripAQEPlan(objHashAggOrSortAggDF.queryExecution.executedPlan)
if (useObjectHashAgg) {
assert(objHashAggOrSortAggPlan.isInstanceOf[ObjectHashAggregateExec])
} else {
assert(objHashAggOrSortAggPlan.isInstanceOf[SortAggregateExec])
}
- objHashAggOrSortAggDF.collect()
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org