You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/12/17 08:47:10 UTC
[spark] branch branch-2.4 updated: [SPARK-33733][SQL][2.4]
PullOutNondeterministic should check and collect deterministic field
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 399b2ca [SPARK-33733][SQL][2.4] PullOutNondeterministic should check and collect deterministic field
399b2ca is described below
commit 399b2cafe3fcec17d40c4ccfd863848aa43d7da4
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Dec 17 08:43:43 2020 +0000
[SPARK-33733][SQL][2.4] PullOutNondeterministic should check and collect deterministic field
backport [#30703](https://github.com/apache/spark/pull/30703) for branch-2.4.
### What changes were proposed in this pull request?
The deterministic field is wider than `NonDerterministic`, we should keep same range between pull out and check analysis.
### Why are the changes needed?
For example
```
select * from values(1), (4) as t(c1) order by java_method('java.lang.Math', 'abs', c1)
```
We will get exception since `java_method` deterministic field is false but not a `NonDeterministic`
```
Exception in thread "main" org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
java_method('java.lang.Math', 'abs', t.`c1`) ASC NULLS FIRST
in operator Sort [java_method(java.lang.Math, abs, c1#1) ASC NULLS FIRST], true
;;
```
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Add test.
Closes #30772 from ulysses-you/SPARK-33733-branch-2.4.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++++-
.../expressions/CallMethodViaReflection.scala | 6 +++---
.../sql/catalyst/analysis/AnalysisSuite.scala | 22 ++++++++++++++++++++++
3 files changed, 29 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index dc40a6a..5ad2e7d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2260,7 +2260,10 @@ class Analyzer(
private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = {
exprs.filterNot(_.deterministic).flatMap { expr =>
- val leafNondeterministic = expr.collect { case n: Nondeterministic => n }
+ val leafNondeterministic = expr.collect {
+ case n: Nondeterministic => n
+ case udf: UserDefinedExpression if !udf.deterministic => udf
+ }
leafNondeterministic.distinct.map { e =>
val ne = e match {
case n: NamedExpression => n
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
index 65bb9a8..0966fcb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala
@@ -53,7 +53,7 @@ import org.apache.spark.util.Utils
a5cf6c42-0c85-418f-af6c-3e4e5b1328f2
""")
case class CallMethodViaReflection(children: Seq[Expression])
- extends Expression with CodegenFallback {
+ extends Nondeterministic with CodegenFallback {
override def prettyName: String = "reflect"
@@ -76,11 +76,11 @@ case class CallMethodViaReflection(children: Seq[Expression])
}
}
- override lazy val deterministic: Boolean = false
override def nullable: Boolean = true
override val dataType: DataType = StringType
+ override protected def initializeInternal(partitionIndex: Int): Unit = {}
- override def eval(input: InternalRow): Any = {
+ override protected def evalInternal(input: InternalRow): Any = {
var i = 0
while (i < argExprs.length) {
buffer(i) = argExprs(i).eval(input).asInstanceOf[Object]
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8eaba32..4c4ba11 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -657,4 +657,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
Seq("Intersect can only be performed on tables with the compatible column types. " +
"timestamp <> double at the second column of the second table"))
}
+
+ test("SPARK-33733: PullOutNondeterministic should check and collect deterministic field") {
+ val reflect =
+ CallMethodViaReflection(Seq("java.lang.Math", "abs", testRelation.output.head))
+ val udf = ScalaUDF(
+ (s: String) => s,
+ StringType,
+ Literal.create(null, StringType) :: Nil,
+ true :: Nil,
+ udfDeterministic = false)
+
+ Seq(reflect, udf).foreach { e: Expression =>
+ val plan = Sort(Seq(e.asc), false, testRelation)
+ val projected = Alias(e, "_nondeterministic")()
+ val expect =
+ Project(testRelation.output,
+ Sort(Seq(projected.toAttribute.asc), false,
+ Project(testRelation.output :+ projected,
+ testRelation)))
+ checkAnalysis(plan, expect)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org