You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/31 21:52:51 UTC

spark git commit: [SPARK-23281][SQL] Query produces results in incorrect order when a composite order by clause refers to both original columns and aliases

Repository: spark
Updated Branches:
  refs/heads/master dd242bad3 -> 9ff1d96f0


[SPARK-23281][SQL] Query produces results in incorrect order when a composite order by clause refers to both original columns and aliases

## What changes were proposed in this pull request?
Here is the test snippet.
``` SQL
scala> Seq[(Integer, Integer)](
     |         (1, 1),
     |         (1, 3),
     |         (2, 3),
     |         (3, 3),
     |         (4, null),
     |         (5, null)
     |       ).toDF("key", "value").createOrReplaceTempView("src")

scala> sql(
     |         """
     |           |SELECT MAX(value) as value, key as col2
     |           |FROM src
     |           |GROUP BY key
     |           |ORDER BY value desc, key
     |         """.stripMargin).show
+-----+----+
|value|col2|
+-----+----+
|    3|   3|
|    3|   2|
|    3|   1|
| null|   5|
| null|   4|
+-----+----+
```SQL
Here is the explain output :

```SQL
== Parsed Logical Plan ==
'Sort ['value DESC NULLS LAST, 'key ASC NULLS FIRST], true
+- 'Aggregate ['key], ['MAX('value) AS value#9, 'key AS col2#10]
   +- 'UnresolvedRelation `src`

== Analyzed Logical Plan ==
value: int, col2: int
Project [value#9, col2#10]
+- Sort [value#9 DESC NULLS LAST, col2#10 DESC NULLS LAST], true
   +- Aggregate [key#5], [max(value#6) AS value#9, key#5 AS col2#10]
      +- SubqueryAlias src
         +- Project [_1#2 AS key#5, _2#3 AS value#6]
            +- LocalRelation [_1#2, _2#3]
``` SQL
The sort direction is being wrongly changed from ASC to DSC while resolving ```Sort``` in
resolveAggregateFunctions.

The above testcase models TPCDS-Q71 and thus we have the same issue in Q71 as well.

## How was this patch tested?
A few tests are added in SQLQuerySuite.

Author: Dilip Biswal <db...@us.ibm.com>

Closes #20453 from dilipbiswal/local_spark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ff1d96f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ff1d96f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ff1d96f

Branch: refs/heads/master
Commit: 9ff1d96f01e2c89acfd248db917e068b93f519a6
Parents: dd242ba
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Wed Jan 31 13:52:47 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 31 13:52:47 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 41 +++++++++++++++++++-
 2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ff1d96f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 91cb036..251099f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1493,7 +1493,7 @@ class Analyzer(
           // to push down this ordering expression and can reference the original aggregate
           // expression instead.
           val needsPushDown = ArrayBuffer.empty[NamedExpression]
-          val evaluatedOrderings = resolvedAliasedOrdering.zip(sortOrder).map {
+          val evaluatedOrderings = resolvedAliasedOrdering.zip(unresolvedSortOrders).map {
             case (evaluated, order) =>
               val index = originalAggExprs.indexWhere {
                 case Alias(child, _) => child semanticEquals evaluated.child

http://git-wip-us.apache.org/repos/asf/spark/blob/9ff1d96f/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index ffd736d..8f14575 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql
 
 import java.io.File
-import java.math.MathContext
 import java.net.{MalformedURLException, URL}
 import java.sql.Timestamp
 import java.util.concurrent.atomic.AtomicBoolean
@@ -1618,6 +1617,46 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("SPARK-23281: verify the correctness of sort direction on composite order by clause") {
+    withTempView("src") {
+      Seq[(Integer, Integer)](
+        (1, 1),
+        (1, 3),
+        (2, 3),
+        (3, 3),
+        (4, null),
+        (5, null)
+      ).toDF("key", "value").createOrReplaceTempView("src")
+
+      checkAnswer(sql(
+        """
+          |SELECT MAX(value) as value, key as col2
+          |FROM src
+          |GROUP BY key
+          |ORDER BY value desc, key
+        """.stripMargin),
+        Seq(Row(3, 1), Row(3, 2), Row(3, 3), Row(null, 4), Row(null, 5)))
+
+      checkAnswer(sql(
+        """
+          |SELECT MAX(value) as value, key as col2
+          |FROM src
+          |GROUP BY key
+          |ORDER BY value desc, key desc
+        """.stripMargin),
+        Seq(Row(3, 3), Row(3, 2), Row(3, 1), Row(null, 5), Row(null, 4)))
+
+      checkAnswer(sql(
+        """
+          |SELECT MAX(value) as value, key as col2
+          |FROM src
+          |GROUP BY key
+          |ORDER BY value asc, key desc
+        """.stripMargin),
+        Seq(Row(null, 5), Row(null, 4), Row(3, 3), Row(3, 2), Row(3, 1)))
+    }
+  }
+
   test("run sql directly on files") {
     val df = spark.range(100).toDF()
     withTempPath(f => {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org