You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/11/05 12:20:06 UTC
[spark] branch master updated: [SPARK-37212][SQL] Improve the
implement of aggregate pushdown
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d563623 [SPARK-37212][SQL] Improve the implement of aggregate pushdown
d563623 is described below
commit d56362392e22ecdf0c0d01c7b233e20fb6e65607
Author: Jiaan Geng <be...@163.com>
AuthorDate: Fri Nov 5 20:19:12 2021 +0800
[SPARK-37212][SQL] Improve the implement of aggregate pushdown
### What changes were proposed in this pull request?
Spark SQL supported aggregate pushdown for JDBC. When I reading the current implement, I find some little issue.
### Why are the changes needed?
Make variable and comment more correctly and Simplify the code.
### Does this PR introduce _any_ user-facing change?
'No'.
### How was this patch tested?
Jenkins test.
Closes #34488 from beliefer/SPARK-37212.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/datasources/DataSourceStrategy.scala | 12 ++++++------
.../execution/datasources/v2/PushDownUtils.scala | 22 +++++++++-------------
.../datasources/v2/V2ScanRelationPushDown.scala | 4 ++--
3 files changed, 17 insertions(+), 21 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2325597..6febbd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -702,23 +702,23 @@ object DataSourceStrategy
(nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters)
}
- protected[sql] def translateAggregate(aggregates: AggregateExpression): Option[AggregateFunc] = {
- if (aggregates.filter.isEmpty) {
- aggregates.aggregateFunction match {
+ protected[sql] def translateAggregate(agg: AggregateExpression): Option[AggregateFunc] = {
+ if (agg.filter.isEmpty) {
+ agg.aggregateFunction match {
case aggregate.Min(PushableColumnWithoutNestedColumn(name)) =>
Some(new Min(FieldReference(name)))
case aggregate.Max(PushableColumnWithoutNestedColumn(name)) =>
Some(new Max(FieldReference(name)))
case count: aggregate.Count if count.children.length == 1 =>
count.children.head match {
- // SELECT COUNT(*) FROM table is translated to SELECT 1 FROM table
+ // COUNT(any literal) is the same as COUNT(*)
case Literal(_, _) => Some(new CountStar())
case PushableColumnWithoutNestedColumn(name) =>
- Some(new Count(FieldReference(name), aggregates.isDistinct))
+ Some(new Count(FieldReference(name), agg.isDistinct))
case _ => None
}
case sum @ aggregate.Sum(PushableColumnWithoutNestedColumn(name), _) =>
- Some(new Sum(FieldReference(name), aggregates.isDistinct))
+ Some(new Sum(FieldReference(name), agg.isDistinct))
case _ => None
}
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index f837ab5..d51b475 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -112,7 +112,7 @@ object PushDownUtils extends PredicateHelper {
* @return pushed aggregation.
*/
def pushAggregates(
- scanBuilder: ScanBuilder,
+ scanBuilder: SupportsPushDownAggregates,
aggregates: Seq[AggregateExpression],
groupBy: Seq[Expression]): Option[Aggregation] = {
@@ -122,20 +122,16 @@ object PushDownUtils extends PredicateHelper {
case _ => None
}
- scanBuilder match {
- case r: SupportsPushDownAggregates if aggregates.nonEmpty =>
- val translatedAggregates = aggregates.flatMap(DataSourceStrategy.translateAggregate)
- val translatedGroupBys = groupBy.flatMap(columnAsString)
-
- if (translatedAggregates.length != aggregates.length ||
- translatedGroupBys.length != groupBy.length) {
- return None
- }
+ val translatedAggregates = aggregates.flatMap(DataSourceStrategy.translateAggregate)
+ val translatedGroupBys = groupBy.flatMap(columnAsString)
- val agg = new Aggregation(translatedAggregates.toArray, translatedGroupBys.toArray)
- Some(agg).filter(r.pushAggregation)
- case _ => None
+ if (translatedAggregates.length != aggregates.length ||
+ translatedGroupBys.length != groupBy.length) {
+ return None
}
+
+ val agg = new Aggregation(translatedAggregates.toArray, translatedGroupBys.toArray)
+ Some(agg).filter(scanBuilder.pushAggregation)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index f73f831..36923d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -85,7 +85,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
case ScanOperation(project, filters, sHolder: ScanBuilderHolder)
if filters.isEmpty && project.forall(_.isInstanceOf[AttributeReference]) =>
sHolder.builder match {
- case _: SupportsPushDownAggregates =>
+ case r: SupportsPushDownAggregates =>
val aggExprToOutputOrdinal = mutable.HashMap.empty[Expression, Int]
var ordinal = 0
val aggregates = resultExpressions.flatMap { expr =>
@@ -105,7 +105,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
val normalizedGroupingExpressions = DataSourceStrategy.normalizeExprs(
groupingExpressions, sHolder.relation.output)
val pushedAggregates = PushDownUtils.pushAggregates(
- sHolder.builder, normalizedAggregates, normalizedGroupingExpressions)
+ r, normalizedAggregates, normalizedGroupingExpressions)
if (pushedAggregates.isEmpty) {
aggNode // return original plan node
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org