You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/20 06:39:01 UTC
[spark] branch branch-2.4 updated: [SPARK-32344][SQL][2.4]
Unevaluable expr is set to FIRST/LAST ignoreNullsExpr in distinct
aggregates
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 58c637a [SPARK-32344][SQL][2.4] Unevaluable expr is set to FIRST/LAST ignoreNullsExpr in distinct aggregates
58c637a is described below
commit 58c637a213e122156b914c27d51e5eb1ec4e0b4c
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sun Jul 19 23:35:44 2020 -0700
[SPARK-32344][SQL][2.4] Unevaluable expr is set to FIRST/LAST ignoreNullsExpr in distinct aggregates
### What changes were proposed in this pull request?
This PR intends to fix a bug of distinct FIRST/LAST aggregates in v2.4.6;
```
scala> sql("SELECT FIRST(DISTINCT v) FROM VALUES 1, 2, 3 t(v)").show()
...
Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: false#37
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:258)
at org.apache.spark.sql.catalyst.expressions.AttributeReference.eval(namedExpressions.scala:226)
at org.apache.spark.sql.catalyst.expressions.aggregate.First.ignoreNulls(First.scala:68)
at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions$lzycompute(First.scala:82)
at org.apache.spark.sql.catalyst.expressions.aggregate.First.updateExpressions(First.scala:81)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$15.apply(HashAggregateExec.scala:268)
```
A root cause of this bug is that the `Aggregation` strategy replaces a foldable boolean `ignoreNullsExpr` expr with a `Unevaluable` expr (`AttributeReference`) for distinct FIRST/LAST aggregate functions. But, this operation cannot be allowed because the `Analyzer` has checked that it must be foldabe;
https://github.com/apache/spark/blob/ffdbbae1d465fe2c710d020de62ca1a6b0b924d9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala#L74-L76
So, this PR proposes to change a vriable for `IGNORE NULLS` from `Expression` to `Boolean` to avoid the case.
This is the backport of https://github.com/apache/spark/pull/29143.
### Why are the changes needed?
Bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a test in `DataFrameAggregateSuite`.
Closes #29157 from maropu/SPARK-32344-BRANCH2.4.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++--
.../sql/catalyst/expressions/aggregate/First.scala | 26 ++++++++++++++--------
.../sql/catalyst/expressions/aggregate/Last.scala | 17 +++++++-------
.../optimizer/RewriteDistinctAggregates.scala | 2 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 4 ++--
...astTestSuite.scala => FirstLastTestSuite.scala} | 18 ++++++++++++---
.../catalyst/parser/ExpressionParserSuite.scala | 8 +++----
.../scala/org/apache/spark/sql/functions.scala | 4 ++--
.../apache/spark/sql/DataFrameAggregateSuite.scala | 7 ++++++
9 files changed, 58 insertions(+), 32 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 43ab651..afe7b4f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -681,9 +681,9 @@ class Analyzer(
// AggregateFunction's with the exception of First and Last in their default mode
// (which we handle) and possibly some Hive UDAF's.
case First(expr, _) =>
- First(ifExpr(expr), Literal(true))
+ First(ifExpr(expr), true)
case Last(expr, _) =>
- Last(ifExpr(expr), Literal(true))
+ Last(ifExpr(expr), true)
case a: AggregateFunction =>
a.withNewChildren(a.children.map(ifExpr))
}.transform {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala
index f51bfd5..d203277 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
@@ -35,12 +36,16 @@ import org.apache.spark.sql.types._
_FUNC_(expr[, isIgnoreNull]) - Returns the first value of `expr` for a group of rows.
If `isIgnoreNull` is true, returns only non-null values.
""")
-case class First(child: Expression, ignoreNullsExpr: Expression)
+case class First(child: Expression, ignoreNulls: Boolean)
extends DeclarativeAggregate with ExpectsInputTypes {
- def this(child: Expression) = this(child, Literal.create(false, BooleanType))
+ def this(child: Expression) = this(child, false)
- override def children: Seq[Expression] = child :: ignoreNullsExpr :: Nil
+ def this(child: Expression, ignoreNullsExpr: Expression) = {
+ this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "first"))
+ }
+
+ override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = true
@@ -57,16 +62,11 @@ case class First(child: Expression, ignoreNullsExpr: Expression)
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
- } else if (!ignoreNullsExpr.foldable) {
- TypeCheckFailure(
- s"The second argument of First must be a boolean literal, but got: ${ignoreNullsExpr.sql}")
} else {
TypeCheckSuccess
}
}
- private def ignoreNulls: Boolean = ignoreNullsExpr.eval().asInstanceOf[Boolean]
-
private lazy val first = AttributeReference("first", child.dataType)()
private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
@@ -106,3 +106,11 @@ case class First(child: Expression, ignoreNullsExpr: Expression)
override def toString: String = s"first($child)${if (ignoreNulls) " ignore nulls"}"
}
+
+object FirstLast {
+ def validateIgnoreNullExpr(exp: Expression, funcName: String): Boolean = exp match {
+ case Literal(b: Boolean, BooleanType) => b
+ case _ => throw new AnalysisException(
+ s"The second argument in $funcName should be a boolean literal.")
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
index 2650d7b..57a62a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckSuccess
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
@@ -35,12 +35,16 @@ import org.apache.spark.sql.types._
_FUNC_(expr[, isIgnoreNull]) - Returns the last value of `expr` for a group of rows.
If `isIgnoreNull` is true, returns only non-null values.
""")
-case class Last(child: Expression, ignoreNullsExpr: Expression)
+case class Last(child: Expression, ignoreNulls: Boolean)
extends DeclarativeAggregate with ExpectsInputTypes {
- def this(child: Expression) = this(child, Literal.create(false, BooleanType))
+ def this(child: Expression) = this(child, false)
- override def children: Seq[Expression] = child :: ignoreNullsExpr :: Nil
+ def this(child: Expression, ignoreNullsExpr: Expression) = {
+ this(child, FirstLast.validateIgnoreNullExpr(ignoreNullsExpr, "last"))
+ }
+
+ override def children: Seq[Expression] = child :: Nil
override def nullable: Boolean = true
@@ -57,16 +61,11 @@ case class Last(child: Expression, ignoreNullsExpr: Expression)
val defaultCheck = super.checkInputDataTypes()
if (defaultCheck.isFailure) {
defaultCheck
- } else if (!ignoreNullsExpr.foldable) {
- TypeCheckFailure(
- s"The second argument of Last must be a boolean literal, but got: ${ignoreNullsExpr.sql}")
} else {
TypeCheckSuccess
}
}
- private def ignoreNulls: Boolean = ignoreNullsExpr.eval().asInstanceOf[Boolean]
-
private lazy val last = AttributeReference("last", child.dataType)()
private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
index b946800..22eb604 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
@@ -198,7 +198,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
// Select the result of the first aggregate in the last aggregate.
val result = AggregateExpression(
- aggregate.First(evalWithinGroup(regularGroupId, operator.toAttribute), Literal(true)),
+ aggregate.First(evalWithinGroup(regularGroupId, operator.toAttribute), true),
mode = Complete,
isDistinct = false)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index e2e8a45..90e7d1c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1225,7 +1225,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitFirst(ctx: FirstContext): Expression = withOrigin(ctx) {
val ignoreNullsExpr = ctx.IGNORE != null
- First(expression(ctx.expression), Literal(ignoreNullsExpr)).toAggregateExpression()
+ First(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
}
/**
@@ -1233,7 +1233,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
*/
override def visitLast(ctx: LastContext): Expression = withOrigin(ctx) {
val ignoreNullsExpr = ctx.IGNORE != null
- Last(expression(ctx.expression), Literal(ignoreNullsExpr)).toAggregateExpression()
+ Last(expression(ctx.expression), ignoreNullsExpr).toAggregateExpression()
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/LastTestSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/FirstLastTestSuite.scala
similarity index 84%
rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/LastTestSuite.scala
rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/FirstLastTestSuite.scala
index ba36bc0..bb6672e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/LastTestSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/FirstLastTestSuite.scala
@@ -17,14 +17,15 @@
package org.apache.spark.sql.catalyst.expressions.aggregate
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
import org.apache.spark.sql.types.IntegerType
-class LastTestSuite extends SparkFunSuite {
+class FirstLastTestSuite extends SparkFunSuite {
val input = AttributeReference("input", IntegerType, nullable = true)()
- val evaluator = DeclarativeAggregateEvaluator(Last(input, Literal(false)), Seq(input))
- val evaluatorIgnoreNulls = DeclarativeAggregateEvaluator(Last(input, Literal(true)), Seq(input))
+ val evaluator = DeclarativeAggregateEvaluator(Last(input, false), Seq(input))
+ val evaluatorIgnoreNulls = DeclarativeAggregateEvaluator(Last(input, true), Seq(input))
test("empty buffer") {
assert(evaluator.initialize() === InternalRow(null, false))
@@ -106,4 +107,15 @@ class LastTestSuite extends SparkFunSuite {
val m1 = evaluatorIgnoreNulls.merge(p1, p2)
assert(evaluatorIgnoreNulls.eval(m1) === InternalRow(1))
}
+
+ test("SPARK-32344: correct error handling for a type mismatch") {
+ val msg1 = intercept[AnalysisException] {
+ new First(input, Literal(1, IntegerType))
+ }.getMessage
+ assert(msg1.contains("The second argument in first should be a boolean literal"))
+ val msg2 = intercept[AnalysisException] {
+ new Last(input, Literal(1, IntegerType))
+ }.getMessage
+ assert(msg2.contains("The second argument in last should be a boolean literal"))
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
index 2bc41da..a7234263 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
@@ -688,9 +688,9 @@ class ExpressionParserSuite extends PlanTest {
}
test("SPARK-19526 Support ignore nulls keywords for first and last") {
- assertEqual("first(a ignore nulls)", First('a, Literal(true)).toAggregateExpression())
- assertEqual("first(a)", First('a, Literal(false)).toAggregateExpression())
- assertEqual("last(a ignore nulls)", Last('a, Literal(true)).toAggregateExpression())
- assertEqual("last(a)", Last('a, Literal(false)).toAggregateExpression())
+ assertEqual("first(a ignore nulls)", First('a, true).toAggregateExpression())
+ assertEqual("first(a)", First('a, false).toAggregateExpression())
+ assertEqual("last(a ignore nulls)", Last('a, true).toAggregateExpression())
+ assertEqual("last(a)", Last('a, false).toAggregateExpression())
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 21ad1fd..4270171 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -455,7 +455,7 @@ object functions {
* @since 2.0.0
*/
def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
- new First(e.expr, Literal(ignoreNulls))
+ new First(e.expr, ignoreNulls)
}
/**
@@ -580,7 +580,7 @@ object functions {
* @since 2.0.0
*/
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
- new Last(e.expr, Literal(ignoreNulls))
+ new Last(e.expr, ignoreNulls)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index bb7c68a..86a1086 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -743,4 +743,11 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext {
"grouping expressions: [current_date(None)], value: [key: int, value: string], " +
"type: GroupBy]"))
}
+
+ test("SPARK-32344: Unevaluable's set to FIRST/LAST ignoreNullsExpr in distinct aggregates") {
+ val queryTemplate = (agg: String) =>
+ s"SELECT $agg(DISTINCT v) FROM (SELECT v FROM VALUES 1, 2, 3 t(v) ORDER BY v)"
+ checkAnswer(sql(queryTemplate("FIRST")), Row(1))
+ checkAnswer(sql(queryTemplate("LAST")), Row(3))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org