You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/03/11 06:22:44 UTC

spark git commit: [SPARK-12718][SPARK-13720][SQL] SQL generation support for window functions

Repository: spark
Updated Branches:
  refs/heads/master 560489f4e -> 6871cc8f3


[SPARK-12718][SPARK-13720][SQL] SQL generation support for window functions

## What changes were proposed in this pull request?

Add SQL generation support for window functions. The idea is simple, just treat `Window` operator like `Project`, i.e. add subquery to its child when necessary, generate a `SELECT ... FROM ...` SQL string, implement `sql` method for window related expressions, e.g. `WindowSpecDefinition`, `WindowFrame`, etc.

This PR also fixed SPARK-13720 by improving the process of adding extra `SubqueryAlias`(the `RecoverScopingInfo` rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we split `RecoverScopingInfo` into 2 rules: `AddSubQuery` and `UpdateQualifier`. `AddSubQuery` only add subquery if necessary, and `UpdateQualifier` will re-propagate and update qualifiers bottom up.

Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here.

Many thanks to gatorsmile for the initial discussion and test cases!

## How was this patch tested?

new tests in `LogicalPlanToSQLSuite`

Author: Wenchen Fan <we...@databricks.com>

Closes #11555 from cloud-fan/window.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6871cc8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6871cc8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6871cc8f

Branch: refs/heads/master
Commit: 6871cc8f3eb54ad674dea89645599c92a8b94415
Parents: 560489f
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Mar 11 13:22:34 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 11 13:22:34 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   6 +-
 .../sql/catalyst/expressions/SortOrder.scala    |   3 +-
 .../expressions/windowExpressions.scala         |  47 +++++++-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 104 ++++++++++++-----
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  | 114 ++++++++++++++++++-
 .../sql/hive/execution/HiveComparisonTest.scala |   1 +
 6 files changed, 236 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b654827..53ea3cf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1194,7 +1194,7 @@ class Analyzer(
         val withWindow = addWindow(windowExpressions, withFilter)
 
         // Finally, generate output columns according to the original projectList.
-        val finalProjectList = aggregateExprs.map (_.toAttribute)
+        val finalProjectList = aggregateExprs.map(_.toAttribute)
         Project(finalProjectList, withWindow)
 
       case p: LogicalPlan if !p.childrenResolved => p
@@ -1210,7 +1210,7 @@ class Analyzer(
         val withWindow = addWindow(windowExpressions, withAggregate)
 
         // Finally, generate output columns according to the original projectList.
-        val finalProjectList = aggregateExprs.map (_.toAttribute)
+        val finalProjectList = aggregateExprs.map(_.toAttribute)
         Project(finalProjectList, withWindow)
 
       // We only extract Window Expressions after all expressions of the Project
@@ -1225,7 +1225,7 @@ class Analyzer(
         val withWindow = addWindow(windowExpressions, withProject)
 
         // Finally, generate output columns according to the original projectList.
-        val finalProjectList = projectList.map (_.toAttribute)
+        val finalProjectList = projectList.map(_.toAttribute)
         Project(finalProjectList, withWindow)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index bd1d914..b739361 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -57,7 +57,8 @@ case class SortOrder(child: Expression, direction: SortDirection)
   override def dataType: DataType = child.dataType
   override def nullable: Boolean = child.nullable
 
-  override def toString: String = s"$child ${if (direction == Ascending) "ASC" else "DESC"}"
+  override def toString: String = s"$child ${direction.sql}"
+  override def sql: String = child.sql + " " + direction.sql
 
   def isAscending: Boolean = direction == Ascending
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 9e6bd0e..b867947 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, NoOp}
 import org.apache.spark.sql.types._
 
@@ -30,6 +31,7 @@ sealed trait WindowSpec
 
 /**
  * The specification for a window function.
+ *
  * @param partitionSpec It defines the way that input rows are partitioned.
  * @param orderSpec It defines the ordering of rows in a partition.
  * @param frameSpecification It defines the window frame in a partition.
@@ -75,6 +77,22 @@ case class WindowSpecDefinition(
   override def nullable: Boolean = true
   override def foldable: Boolean = false
   override def dataType: DataType = throw new UnsupportedOperationException
+
+  override def sql: String = {
+    val partition = if (partitionSpec.isEmpty) {
+      ""
+    } else {
+      "PARTITION BY " + partitionSpec.map(_.sql).mkString(", ")
+    }
+
+    val order = if (orderSpec.isEmpty) {
+      ""
+    } else {
+      "ORDER BY " + orderSpec.map(_.sql).mkString(", ")
+    }
+
+    s"($partition $order ${frameSpecification.toString})"
+  }
 }
 
 /**
@@ -278,6 +296,7 @@ case class WindowExpression(
   override def nullable: Boolean = windowFunction.nullable
 
   override def toString: String = s"$windowFunction $windowSpec"
+  override def sql: String = windowFunction.sql + " OVER " + windowSpec.sql
 }
 
 /**
@@ -451,6 +470,7 @@ object SizeBasedWindowFunction {
      the window partition.""")
 case class RowNumber() extends RowNumberLike {
   override val evaluateExpression = rowNumber
+  override def sql: String = "ROW_NUMBER()"
 }
 
 /**
@@ -470,6 +490,7 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   // return the same value for equal values in the partition.
   override val frame = SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
   override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), Cast(n, DoubleType))
+  override def sql: String = "CUME_DIST()"
 }
 
 /**
@@ -499,12 +520,25 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
 case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction {
   def this() = this(Literal(1))
 
+  override def children: Seq[Expression] = Seq(buckets)
+
   // Validate buckets. Note that this could be relaxed, the bucket value only needs to constant
   // for each partition.
-  buckets.eval() match {
-    case b: Int if b > 0 => // Ok
-    case x => throw new AnalysisException(
-      "Buckets expression must be a foldable positive integer expression: $x")
+  override def checkInputDataTypes(): TypeCheckResult = {
+    if (!buckets.foldable) {
+      return TypeCheckFailure(s"Buckets expression must be foldable, but got $buckets")
+    }
+
+    if (buckets.dataType != IntegerType) {
+      return TypeCheckFailure(s"Buckets expression must be integer type, but got $buckets")
+    }
+
+    val i = buckets.eval().asInstanceOf[Int]
+    if (i > 0) {
+      TypeCheckSuccess
+    } else {
+      TypeCheckFailure(s"Buckets expression must be positive, but got: $i")
+    }
   }
 
   private val bucket = AttributeReference("bucket", IntegerType, nullable = false)()
@@ -608,6 +642,7 @@ abstract class RankLike extends AggregateWindowFunction {
 case class Rank(children: Seq[Expression]) extends RankLike {
   def this() = this(Nil)
   override def withOrder(order: Seq[Expression]): Rank = Rank(order)
+  override def sql: String = "RANK()"
 }
 
 /**
@@ -632,6 +667,7 @@ case class DenseRank(children: Seq[Expression]) extends RankLike {
   override val updateExpressions = increaseRank +: children
   override val aggBufferAttributes = rank +: orderAttrs
   override val initialValues = zero +: orderInit
+  override def sql: String = "DENSE_RANK()"
 }
 
 /**
@@ -658,4 +694,5 @@ case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBase
   override val evaluateExpression = If(GreaterThan(n, one),
       Divide(Cast(Subtract(rank, one), DoubleType), Cast(Subtract(n, one), DoubleType)),
       Literal(0.0d))
+  override def sql: String = "PERCENT_RANK()"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 683f738..bf12982 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -42,7 +42,7 @@ case class SubqueryHolder(query: String) extends LeafExpression with Unevaluable
 }
 
 /**
- * A builder class used to convert a resolved logical plan into a SQL query string.  Note that this
+ * A builder class used to convert a resolved logical plan into a SQL query string.  Note that not
  * all resolved logical plan are convertible.  They either don't have corresponding SQL
  * representations (e.g. logical plans that operate on local Scala collections), or are simply not
  * supported by this builder (yet).
@@ -103,6 +103,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     case p: Aggregate =>
       aggregateToSQL(p)
 
+    case w: Window =>
+      windowToSQL(w)
+
     case Limit(limitExpr, child) =>
       s"${toSQL(child)} LIMIT ${limitExpr.sql}"
 
@@ -123,12 +126,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
           build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
       }
 
-    case p: Filter =>
-      val whereOrHaving = p.child match {
+    case Filter(condition, child) =>
+      val whereOrHaving = child match {
         case _: Aggregate => "HAVING"
         case _ => "WHERE"
       }
-      build(toSQL(p.child), whereOrHaving, p.condition.sql)
+      build(toSQL(child), whereOrHaving, condition.sql)
 
     case p @ Distinct(u: Union) if u.children.length > 1 =>
       val childrenSql = u.children.map(c => s"(${toSQL(c)})")
@@ -179,7 +182,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
       build(
         toSQL(p.child),
         if (p.global) "ORDER BY" else "SORT BY",
-        p.order.map { case SortOrder(e, dir) => s"${e.sql} ${dir.sql}" }.mkString(", ")
+        p.order.map(_.sql).mkString(", ")
       )
 
     case p: RepartitionByExpression =>
@@ -268,9 +271,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         case attr: Attribute if groupByAttrMap.contains(attr) => groupByAttrMap(attr)
       }
     }
-    val groupingSetSQL =
-      "GROUPING SETS(" +
-        groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
+    val groupingSetSQL = "GROUPING SETS(" +
+      groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
 
     val aggExprs = agg.aggregateExpressions.map { case expr =>
       expr.transformDown {
@@ -297,22 +299,50 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     )
   }
 
+  private def windowToSQL(w: Window): String = {
+    build(
+      "SELECT",
+      (w.child.output ++ w.windowExpressions).map(_.sql).mkString(", "),
+      if (w.child == OneRowRelation) "" else "FROM",
+      toSQL(w.child)
+    )
+  }
+
   object Canonicalizer extends RuleExecutor[LogicalPlan] {
     override protected def batches: Seq[Batch] = Seq(
-      Batch("Canonicalizer", FixedPoint(100),
+      Batch("Collapse Project", FixedPoint(100),
         // The `WidenSetOperationTypes` analysis rule may introduce extra `Project`s over
         // `Aggregate`s to perform type casting.  This rule merges these `Project`s into
         // `Aggregate`s.
-        CollapseProject,
-
+        CollapseProject),
+      Batch("Recover Scoping Info", Once,
         // Used to handle other auxiliary `Project`s added by analyzer (e.g.
         // `ResolveAggregateFunctions` rule)
-        RecoverScopingInfo
+        AddSubquery,
+        // Previous rule will add extra sub-queries, this rule is used to re-propagate and update
+        // the qualifiers bottom up, e.g.:
+        //
+        // Sort
+        //   ordering = t1.a
+        //   Project
+        //     projectList = [t1.a, t1.b]
+        //     Subquery gen_subquery
+        //       child ...
+        //
+        // will be transformed to:
+        //
+        // Sort
+        //   ordering = gen_subquery.a
+        //   Project
+        //     projectList = [gen_subquery.a, gen_subquery.b]
+        //     Subquery gen_subquery
+        //       child ...
+        UpdateQualifiers
       )
     )
 
-    object RecoverScopingInfo extends Rule[LogicalPlan] {
-      override def apply(tree: LogicalPlan): LogicalPlan = tree transform {
+    object AddSubquery extends Rule[LogicalPlan] {
+      override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
         // This branch handles aggregate functions within HAVING clauses.  For example:
         //
         //   SELECT key FROM src GROUP BY key HAVING max(value) > "val_255"
@@ -324,8 +354,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         //    +- Filter ...
         //        +- Aggregate ...
         //            +- MetastoreRelation default, src, None
-        case plan @ Project(_, Filter(_, _: Aggregate)) =>
-          wrapChildWithSubquery(plan)
+        case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan)
+
+        case w @ Window(_, _, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w)
 
         case plan @ Project(_,
           _: SubqueryAlias
@@ -338,20 +369,39 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
             | _: Sample
         ) => plan
 
-        case plan: Project =>
-          wrapChildWithSubquery(plan)
+        case plan: Project => wrapChildWithSubquery(plan)
+
+        // We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
+        // be able to put in the FROM clause, or we wrap it with a subquery.
+        case w @ Window(_, _, _, _,
+          _: SubqueryAlias
+            | _: Filter
+            | _: Join
+            | _: MetastoreRelation
+            | OneRowRelation
+            | _: LocalLimit
+            | _: GlobalLimit
+            | _: Sample
+        ) => w
+
+        case w: Window => wrapChildWithSubquery(w)
       }
 
-      def wrapChildWithSubquery(project: Project): Project = project match {
-        case Project(projectList, child) =>
-          val alias = SQLBuilder.newSubqueryName
-          val childAttributes = child.outputSet
-          val aliasedProjectList = projectList.map(_.transform {
-            case a: Attribute if childAttributes.contains(a) =>
-              a.withQualifiers(alias :: Nil)
-          }.asInstanceOf[NamedExpression])
+      private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
+        val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
+        plan.withNewChildren(Seq(newChild))
+      }
+    }
 
-          Project(aliasedProjectList, SubqueryAlias(alias, child))
+    object UpdateQualifiers extends Rule[LogicalPlan] {
+      override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
+        case plan =>
+          val inputAttributes = plan.children.flatMap(_.output)
+          plan transformExpressions {
+            case a: AttributeReference if !plan.producedAttributes.contains(a) =>
+              val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
+              a.withQualifiers(qualifier.getOrElse(Nil))
+          }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index ccce987..198652b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -67,7 +67,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
              |
              |# Resolved query plan:
              |${df.queryExecution.analyzed.treeString}
-           """.stripMargin)
+           """.stripMargin, e)
     }
 
     try {
@@ -84,8 +84,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
            |
            |# Resolved query plan:
            |${df.queryExecution.analyzed.treeString}
-         """.stripMargin,
-        cause)
+         """.stripMargin, cause)
     }
   }
 
@@ -331,6 +330,10 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     checkHiveQl("SELECT id FROM parquet_t0 DISTRIBUTE BY id SORT BY id")
   }
 
+  test("SPARK-13720: sort by after having") {
+    checkHiveQl("SELECT COUNT(value) FROM parquet_t1 GROUP BY key HAVING MAX(key) > 0 SORT BY key")
+  }
+
   test("distinct aggregation") {
     checkHiveQl("SELECT COUNT(DISTINCT id) FROM parquet_t0")
   }
@@ -442,4 +445,109 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
       "f1", "b[0].f1", "f1", "c[foo]", "d[0]"
     )
   }
+
+  test("window basic") {
+    checkHiveQl("SELECT MAX(value) OVER (PARTITION BY key % 3) FROM parquet_t1")
+    checkHiveQl(
+      """
+         |SELECT key, value, ROUND(AVG(key) OVER (), 2)
+         |FROM parquet_t1 ORDER BY key
+      """.stripMargin)
+    checkHiveQl(
+      """
+         |SELECT value, MAX(key + 1) OVER (PARTITION BY key % 5 ORDER BY key % 7) AS max
+         |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("multiple window functions in one expression") {
+    checkHiveQl(
+      """
+        |SELECT
+        |  MAX(key) OVER (ORDER BY key DESC, value) / MIN(key) OVER (PARTITION BY key % 3)
+        |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("regular expressions and window functions in one expression") {
+    checkHiveQl("SELECT MAX(key) OVER (PARTITION BY key % 3) + key FROM parquet_t1")
+  }
+
+  test("aggregate functions and window functions in one expression") {
+    checkHiveQl("SELECT MAX(c) + COUNT(a) OVER () FROM parquet_t2 GROUP BY a, b")
+  }
+
+  test("window with different window specification") {
+    checkHiveQl(
+      """
+         |SELECT key, value,
+         |DENSE_RANK() OVER (ORDER BY key, value) AS dr,
+         |MAX(value) OVER (PARTITION BY key ORDER BY key ASC) AS max
+         |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("window with the same window specification with aggregate + having") {
+    checkHiveQl(
+      """
+         |SELECT key, value,
+         |MAX(value) OVER (PARTITION BY key % 5 ORDER BY key DESC) AS max
+         |FROM parquet_t1 GROUP BY key, value HAVING key > 5
+      """.stripMargin)
+  }
+
+  test("window with the same window specification with aggregate functions") {
+    checkHiveQl(
+      """
+         |SELECT key, value,
+         |MAX(value) OVER (PARTITION BY key % 5 ORDER BY key) AS max
+         |FROM parquet_t1 GROUP BY key, value
+      """.stripMargin)
+  }
+
+  test("window with the same window specification with aggregate") {
+    checkHiveQl(
+      """
+         |SELECT key, value,
+         |DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
+         |COUNT(key)
+         |FROM parquet_t1 GROUP BY key, value
+      """.stripMargin)
+  }
+
+  test("window with the same window specification without aggregate and filter") {
+    checkHiveQl(
+      """
+         |SELECT key, value,
+         |DENSE_RANK() OVER (DISTRIBUTE BY key SORT BY key, value) AS dr,
+         |COUNT(key) OVER(DISTRIBUTE BY key SORT BY key, value) AS ca
+         |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("window clause") {
+    checkHiveQl(
+      """
+         |SELECT key, MAX(value) OVER w1 AS MAX, MIN(value) OVER w2 AS min
+         |FROM parquet_t1
+         |WINDOW w1 AS (PARTITION BY key % 5 ORDER BY key), w2 AS (PARTITION BY key % 6)
+      """.stripMargin)
+  }
+
+  test("special window functions") {
+    checkHiveQl(
+      """
+        |SELECT
+        |  RANK() OVER w,
+        |  PERCENT_RANK() OVER w,
+        |  DENSE_RANK() OVER w,
+        |  ROW_NUMBER() OVER w,
+        |  NTILE(10) OVER w,
+        |  CUME_DIST() OVER w,
+        |  LAG(key, 2) OVER w,
+        |  LEAD(key, 2) OVER w
+        |FROM parquet_t1
+        |WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
+      """.stripMargin)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6871cc8f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index afb8162..1053246 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -449,6 +449,7 @@ abstract class HiveComparisonTest
                   |Failed to execute query using catalyst:
                   |Error: ${e.getMessage}
                   |${stackTraceToString(e)}
+                  |$queryString
                   |$query
                   |== HIVE - ${hive.size} row(s) ==
                   |${hive.mkString("\n")}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org