You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/12/18 00:16:40 UTC

[1/2] spark git commit: [SPARK-8641][SQL] Native Spark Window functions

Repository: spark
Updated Branches:
  refs/heads/master ed6ebda5c -> 658f66e62


http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
index 92bb9e6..98bbdf0 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionQuerySuite.scala
@@ -454,6 +454,9 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
       |window w1 as (distribute by p_mfgr sort by p_name rows between 2 preceding and 2 following)
     """.stripMargin, reset = false)
 
+  /* Disabled because:
+     - Spark uses a different default stddev.
+     - Tiny numerical differences in stddev results.
   createQueryTest("windowing.q -- 15. testExpressions",
     s"""
       |select  p_mfgr,p_name, p_size,
@@ -472,7 +475,7 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
       |window w1 as (distribute by p_mfgr sort by p_mfgr, p_name
       |             rows between 2 preceding and 2 following)
     """.stripMargin, reset = false)
-
+  */
   createQueryTest("windowing.q -- 16. testMultipleWindows",
     s"""
       |select  p_mfgr,p_name, p_size,
@@ -530,6 +533,9 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
   // when running this test suite under Java 7 and 8.
   // We change the original sql query a little bit for making the test suite passed
   // under different JDK
+  /* Disabled because:
+     - Spark uses a different default stddev.
+     - Tiny numerical differences in stddev results.
   createQueryTest("windowing.q -- 20. testSTATs",
     """
       |select p_mfgr,p_name, p_size, sdev, sdev_pop, uniq_data, var, cor, covarp
@@ -547,7 +553,7 @@ class HiveWindowFunctionQuerySuite extends HiveComparisonTest with BeforeAndAfte
       |) t lateral view explode(uniq_size) d as uniq_data
       |order by p_mfgr,p_name, p_size, sdev, sdev_pop, uniq_data, var, cor, covarp
     """.stripMargin, reset = false)
-
+  */
   createQueryTest("windowing.q -- 21. testDISTs",
     """
       |select  p_mfgr,p_name, p_size,

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5958777..0eeb62c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -476,7 +476,6 @@ class HiveContext private[hive](
         catalog.CreateTables ::
         catalog.PreInsertionCasts ::
         ExtractPythonUDFs ::
-        ResolveHiveWindowFunction ::
         PreInsertCastAndRename ::
         (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 091caab..da41b65 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -353,6 +353,14 @@ private[hive] object HiveQl extends Logging {
   }
 
   /** Extractor for matching Hive's AST Tokens. */
+  private[hive] case class Token(name: String, children: Seq[ASTNode]) extends Node {
+    def getName(): String = name
+    def getChildren(): java.util.List[Node] = {
+      val col = new java.util.ArrayList[Node](children.size)
+      children.foreach(col.add(_))
+      col
+    }
+  }
   object Token {
     /** @return matches of the form (tokenName, children). */
     def unapply(t: Any): Option[(String, Seq[ASTNode])] = t match {
@@ -360,6 +368,7 @@ private[hive] object HiveQl extends Logging {
         CurrentOrigin.setPosition(t.getLine, t.getCharPositionInLine)
         Some((t.getText,
           Option(t.getChildren).map(_.asScala.toList).getOrElse(Nil).asInstanceOf[Seq[ASTNode]]))
+      case t: Token => Some((t.name, t.children))
       case _ => None
     }
   }
@@ -1617,17 +1626,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
       UnresolvedExtractValue(nodeToExpr(child), nodeToExpr(ordinal))
 
     /* Window Functions */
-    case Token("TOK_FUNCTION", Token(name, Nil) +: args :+ Token("TOK_WINDOWSPEC", spec)) =>
-      val function = UnresolvedWindowFunction(name, args.map(nodeToExpr))
-      nodesToWindowSpecification(spec) match {
-        case reference: WindowSpecReference =>
-          UnresolvedWindowExpression(function, reference)
-        case definition: WindowSpecDefinition =>
-          WindowExpression(function, definition)
-      }
-    case Token("TOK_FUNCTIONSTAR", Token(name, Nil) :: Token("TOK_WINDOWSPEC", spec) :: Nil) =>
-      // Safe to use Literal(1)?
-      val function = UnresolvedWindowFunction(name, Literal(1) :: Nil)
+    case Token(name, args :+ Token("TOK_WINDOWSPEC", spec)) =>
+      val function = nodeToExpr(Token(name, args))
       nodesToWindowSpecification(spec) match {
         case reference: WindowSpecReference =>
           UnresolvedWindowExpression(function, reference)

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 2e8c026..a1787fc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -261,230 +261,6 @@ private[hive] case class HiveGenericUDF(funcWrapper: HiveFunctionWrapper, childr
 }
 
 /**
- * Resolves [[UnresolvedWindowFunction]] to [[HiveWindowFunction]].
- */
-private[spark] object ResolveHiveWindowFunction extends Rule[LogicalPlan] {
-  private def shouldResolveFunction(
-      unresolvedWindowFunction: UnresolvedWindowFunction,
-      windowSpec: WindowSpecDefinition): Boolean = {
-    unresolvedWindowFunction.childrenResolved && windowSpec.childrenResolved
-  }
-
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case p: LogicalPlan if !p.childrenResolved => p
-
-    // We are resolving WindowExpressions at here. When we get here, we have already
-    // replaced those WindowSpecReferences.
-    case p: LogicalPlan =>
-      p transformExpressions {
-        // We will not start to resolve the function unless all arguments are resolved
-        // and all expressions in window spec are fixed.
-        case WindowExpression(
-          u @ UnresolvedWindowFunction(name, children),
-          windowSpec: WindowSpecDefinition) if shouldResolveFunction(u, windowSpec) =>
-          // First, let's find the window function info.
-          val windowFunctionInfo: WindowFunctionInfo =
-            Option(FunctionRegistry.getWindowFunctionInfo(name.toLowerCase)).getOrElse(
-              throw new AnalysisException(s"Couldn't find window function $name"))
-
-          // Get the class of this function.
-          // In Hive 0.12, there is no windowFunctionInfo.getFunctionClass. So, we use
-          // windowFunctionInfo.getfInfo().getFunctionClass for both Hive 0.13 and Hive 0.13.1.
-          val functionClass = windowFunctionInfo.getFunctionClass()
-          val newChildren =
-            // Rank(), DENSE_RANK(), CUME_DIST(), and PERCENT_RANK() do not take explicit
-            // input parameters and requires implicit parameters, which
-            // are expressions in Order By clause.
-            if (classOf[GenericUDAFRank].isAssignableFrom(functionClass)) {
-              if (children.nonEmpty) {
-               throw new AnalysisException(s"$name does not take input parameters.")
-              }
-              windowSpec.orderSpec.map(_.child)
-            } else {
-              children
-            }
-
-          // If the class is UDAF, we need to use UDAFBridge.
-          val isUDAFBridgeRequired =
-            if (classOf[UDAF].isAssignableFrom(functionClass)) {
-              true
-            } else {
-              false
-            }
-
-          // Create the HiveWindowFunction. For the meaning of isPivotResult, see the doc of
-          // HiveWindowFunction.
-          val windowFunction =
-            HiveWindowFunction(
-              new HiveFunctionWrapper(functionClass.getName),
-              windowFunctionInfo.isPivotResult,
-              isUDAFBridgeRequired,
-              newChildren)
-
-          // Second, check if the specified window function can accept window definition.
-          windowSpec.frameSpecification match {
-            case frame: SpecifiedWindowFrame if !windowFunctionInfo.isSupportsWindow =>
-              // This Hive window function does not support user-speficied window frame.
-              throw new AnalysisException(
-                s"Window function $name does not take a frame specification.")
-            case frame: SpecifiedWindowFrame if windowFunctionInfo.isSupportsWindow &&
-                                                windowFunctionInfo.isPivotResult =>
-              // These two should not be true at the same time when a window frame is defined.
-              // If so, throw an exception.
-              throw new AnalysisException(s"Could not handle Hive window function $name because " +
-                s"it supports both a user specified window frame and pivot result.")
-            case _ => // OK
-          }
-          // Resolve those UnspecifiedWindowFrame because the physical Window operator still needs
-          // a window frame specification to work.
-          val newWindowSpec = windowSpec.frameSpecification match {
-            case UnspecifiedFrame =>
-              val newWindowFrame =
-                SpecifiedWindowFrame.defaultWindowFrame(
-                  windowSpec.orderSpec.nonEmpty,
-                  windowFunctionInfo.isSupportsWindow)
-              WindowSpecDefinition(windowSpec.partitionSpec, windowSpec.orderSpec, newWindowFrame)
-            case _ => windowSpec
-          }
-
-          // Finally, we create a WindowExpression with the resolved window function and
-          // specified window spec.
-          WindowExpression(windowFunction, newWindowSpec)
-      }
-  }
-}
-
-/**
- * A [[WindowFunction]] implementation wrapping Hive's window function.
- * @param funcWrapper The wrapper for the Hive Window Function.
- * @param pivotResult If it is true, the Hive function will return a list of values representing
- *                    the values of the added columns. Otherwise, a single value is returned for
- *                    current row.
- * @param isUDAFBridgeRequired If it is true, the function returned by functionWrapper's
- *                             createFunction is UDAF, we need to use GenericUDAFBridge to wrap
- *                             it as a GenericUDAFResolver2.
- * @param children Input parameters.
- */
-private[hive] case class HiveWindowFunction(
-    funcWrapper: HiveFunctionWrapper,
-    pivotResult: Boolean,
-    isUDAFBridgeRequired: Boolean,
-    children: Seq[Expression]) extends WindowFunction
-  with HiveInspectors with Unevaluable {
-
-  // Hive window functions are based on GenericUDAFResolver2.
-  type UDFType = GenericUDAFResolver2
-
-  @transient
-  protected lazy val resolver: GenericUDAFResolver2 =
-    if (isUDAFBridgeRequired) {
-      new GenericUDAFBridge(funcWrapper.createFunction[UDAF]())
-    } else {
-      funcWrapper.createFunction[GenericUDAFResolver2]()
-    }
-
-  @transient
-  protected lazy val inputInspectors = children.map(toInspector).toArray
-
-  // The GenericUDAFEvaluator used to evaluate the window function.
-  @transient
-  protected lazy val evaluator: GenericUDAFEvaluator = {
-    val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
-    resolver.getEvaluator(parameterInfo)
-  }
-
-  // The object inspector of values returned from the Hive window function.
-  @transient
-  protected lazy val returnInspector = {
-    evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inputInspectors)
-  }
-
-  override val dataType: DataType =
-    if (!pivotResult) {
-      inspectorToDataType(returnInspector)
-    } else {
-      // If pivotResult is true, we should take the element type out as the data type of this
-      // function.
-      inspectorToDataType(returnInspector) match {
-        case ArrayType(dt, _) => dt
-        case _ =>
-          sys.error(
-            s"error resolve the data type of window function ${funcWrapper.functionClassName}")
-      }
-    }
-
-  override def nullable: Boolean = true
-
-  @transient
-  lazy val inputProjection = new InterpretedProjection(children)
-
-  @transient
-  private var hiveEvaluatorBuffer: AggregationBuffer = _
-  // Output buffer.
-  private var outputBuffer: Any = _
-
-  @transient
-  private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray
-
-  override def init(): Unit = {
-    evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, inputInspectors)
-  }
-
-  // Reset the hiveEvaluatorBuffer and outputPosition
-  override def reset(): Unit = {
-    // We create a new aggregation buffer to workaround the bug in GenericUDAFRowNumber.
-    // Basically, GenericUDAFRowNumberEvaluator.reset calls RowNumberBuffer.init.
-    // However, RowNumberBuffer.init does not really reset this buffer.
-    hiveEvaluatorBuffer = evaluator.getNewAggregationBuffer
-    evaluator.reset(hiveEvaluatorBuffer)
-  }
-
-  override def prepareInputParameters(input: InternalRow): AnyRef = {
-    wrap(
-      inputProjection(input),
-      inputInspectors,
-      new Array[AnyRef](children.length),
-      inputDataTypes)
-  }
-
-  // Add input parameters for a single row.
-  override def update(input: AnyRef): Unit = {
-    evaluator.iterate(hiveEvaluatorBuffer, input.asInstanceOf[Array[AnyRef]])
-  }
-
-  override def batchUpdate(inputs: Array[AnyRef]): Unit = {
-    var i = 0
-    while (i < inputs.length) {
-      evaluator.iterate(hiveEvaluatorBuffer, inputs(i).asInstanceOf[Array[AnyRef]])
-      i += 1
-    }
-  }
-
-  override def evaluate(): Unit = {
-    outputBuffer = unwrap(evaluator.evaluate(hiveEvaluatorBuffer), returnInspector)
-  }
-
-  override def get(index: Int): Any = {
-    if (!pivotResult) {
-      // if pivotResult is false, we will get a single value for all rows in the frame.
-      outputBuffer
-    } else {
-      // if pivotResult is true, we will get a ArrayData having the same size with the size
-      // of the window frame. At here, we will return the result at the position of
-      // index in the output buffer.
-      outputBuffer.asInstanceOf[ArrayData].get(index, dataType)
-    }
-  }
-
-  override def toString: String = {
-    s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
-  }
-
-  override def newInstance(): WindowFunction =
-    new HiveWindowFunction(funcWrapper, pivotResult, isUDAFBridgeRequired, children)
-}
-
-/**
  * Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
  * [[Generator]].  Note that the semantics of Generators do not allow
  * Generators to maintain state in between input rows.  Thus UDTFs that rely on partitioning

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
deleted file mode 100644
index 2c98f1c..0000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameWindowSuite.scala
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql.{Row, QueryTest}
-import org.apache.spark.sql.expressions.Window
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-
-class HiveDataFrameWindowSuite extends QueryTest with TestHiveSingleton {
-  import hiveContext.implicits._
-  import hiveContext.sql
-
-  test("reuse window partitionBy") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    val w = Window.partitionBy("key").orderBy("value")
-
-    checkAnswer(
-      df.select(
-        lead("key", 1).over(w),
-        lead("value", 1).over(w)),
-      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
-  }
-
-  test("reuse window orderBy") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    val w = Window.orderBy("value").partitionBy("key")
-
-    checkAnswer(
-      df.select(
-        lead("key", 1).over(w),
-        lead("value", 1).over(w)),
-      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
-  }
-
-  test("lead") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-
-    checkAnswer(
-      df.select(
-        lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
-      sql(
-        """SELECT
-          | lead(value) OVER (PARTITION BY key ORDER BY value)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("lag") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-
-    checkAnswer(
-      df.select(
-        lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
-      sql(
-        """SELECT
-          | lag(value) OVER (PARTITION BY key ORDER BY value)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("lead with default value") {
-    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
-                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        lead("value", 2, "n/a").over(Window.partitionBy("key").orderBy("value"))),
-      sql(
-        """SELECT
-          | lead(value, 2, "n/a") OVER (PARTITION BY key ORDER BY value)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("lag with default value") {
-    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
-                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        lag("value", 2, "n/a").over(Window.partitionBy($"key").orderBy($"value"))),
-      sql(
-        """SELECT
-          | lag(value, 2, "n/a") OVER (PARTITION BY key ORDER BY value)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("rank functions in unspecific window") {
-    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        max("key").over(Window.partitionBy("value").orderBy("key")),
-        min("key").over(Window.partitionBy("value").orderBy("key")),
-        mean("key").over(Window.partitionBy("value").orderBy("key")),
-        count("key").over(Window.partitionBy("value").orderBy("key")),
-        sum("key").over(Window.partitionBy("value").orderBy("key")),
-        ntile(2).over(Window.partitionBy("value").orderBy("key")),
-        rowNumber().over(Window.partitionBy("value").orderBy("key")),
-        denseRank().over(Window.partitionBy("value").orderBy("key")),
-        rank().over(Window.partitionBy("value").orderBy("key")),
-        cumeDist().over(Window.partitionBy("value").orderBy("key")),
-        percentRank().over(Window.partitionBy("value").orderBy("key"))),
-      sql(
-        s"""SELECT
-           |key,
-           |max(key) over (partition by value order by key),
-           |min(key) over (partition by value order by key),
-           |avg(key) over (partition by value order by key),
-           |count(key) over (partition by value order by key),
-           |sum(key) over (partition by value order by key),
-           |ntile(2) over (partition by value order by key),
-           |row_number() over (partition by value order by key),
-           |dense_rank() over (partition by value order by key),
-           |rank() over (partition by value order by key),
-           |cume_dist() over (partition by value order by key),
-           |percent_rank() over (partition by value order by key)
-           |FROM window_table""".stripMargin).collect())
-  }
-
-  test("aggregation and rows between") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        avg("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 2))),
-      sql(
-        """SELECT
-          | avg(key) OVER
-          |   (PARTITION BY value ORDER BY key ROWS BETWEEN 1 preceding and 2 following)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("aggregation and range betweens") {
-    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        avg("key").over(Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 1))),
-      sql(
-        """SELECT
-          | avg(key) OVER
-          |   (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and 1 following)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("aggregation and rows betweens with unbounded") {
-    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        last("value").over(
-          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, Long.MaxValue)),
-        last("value").over(
-          Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
-        last("value").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 3))),
-      sql(
-        """SELECT
-          | key,
-          | last_value(value) OVER
-          |   (PARTITION BY value ORDER BY key ROWS between current row and unbounded following),
-          | last_value(value) OVER
-          |   (PARTITION BY value ORDER BY key ROWS between unbounded preceding and current row),
-          | last_value(value) OVER
-          |   (PARTITION BY value ORDER BY key ROWS between 1 preceding and 3 following)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("aggregation and range betweens with unbounded") {
-    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value")
-    df.registerTempTable("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        last("value").over(
-          Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1))
-          .equalTo("2")
-          .as("last_v"),
-        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1))
-          .as("avg_key1"),
-        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, Long.MaxValue))
-          .as("avg_key2"),
-        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 0))
-          .as("avg_key3")
-      ),
-      sql(
-        """SELECT
-          | key,
-          | last_value(value) OVER
-          |   (PARTITION BY value ORDER BY key RANGE BETWEEN 2 preceding and 1 preceding) == "2",
-          | avg(key) OVER
-          |   (PARTITION BY value ORDER BY key RANGE BETWEEN unbounded preceding and 1 following),
-          | avg(key) OVER
-          |   (PARTITION BY value ORDER BY key RANGE BETWEEN current row and unbounded following),
-          | avg(key) OVER
-          |   (PARTITION BY value ORDER BY key RANGE BETWEEN 1 preceding and current row)
-          | FROM window_table""".stripMargin).collect())
-  }
-
-  test("reverse sliding range frame") {
-    val df = Seq(
-      (1, "Thin", "Cell Phone", 6000),
-      (2, "Normal", "Tablet", 1500),
-      (3, "Mini", "Tablet", 5500),
-      (4, "Ultra thin", "Cell Phone", 5500),
-      (5, "Very thin", "Cell Phone", 6000),
-      (6, "Big", "Tablet", 2500),
-      (7, "Bendable", "Cell Phone", 3000),
-      (8, "Foldable", "Cell Phone", 3000),
-      (9, "Pro", "Tablet", 4500),
-      (10, "Pro2", "Tablet", 6500)).
-      toDF("id", "product", "category", "revenue")
-    val window = Window.
-      partitionBy($"category").
-      orderBy($"revenue".desc).
-      rangeBetween(-2000L, 1000L)
-    checkAnswer(
-      df.select(
-        $"id",
-        avg($"revenue").over(window).cast("int")),
-      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
-        Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
-        Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
-        Row(10, 6000) :: Nil)
-  }
-
-  // This is here to illustrate the fact that reverse order also reverses offsets.
-  test("reverse unbounded range frame") {
-    val df = Seq(1, 2, 4, 3, 2, 1).
-      map(Tuple1.apply).
-      toDF("value")
-    val window = Window.orderBy($"value".desc)
-    checkAnswer(
-      df.select(
-        $"value",
-        sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
-        sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
-      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
-        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
new file mode 100644
index 0000000..c05dbfd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * This suite contains a couple of Hive window tests which fail in the typical setup due to tiny
+ * numerical differences or due semantic differences between Hive and Spark.
+ */
+class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS part")
+    sql(
+      """
+        |CREATE TABLE part(
+        |  p_partkey INT,
+        |  p_name STRING,
+        |  p_mfgr STRING,
+        |  p_brand STRING,
+        |  p_type STRING,
+        |  p_size INT,
+        |  p_container STRING,
+        |  p_retailprice DOUBLE,
+        |  p_comment STRING)
+      """.stripMargin)
+    val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").getCanonicalPath
+    sql(
+      s"""
+         |LOAD DATA LOCAL INPATH '$testData1' overwrite into table part
+      """.stripMargin)
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS part")
+  }
+
+  test("windowing.q -- 15. testExpressions") {
+    // Moved because:
+    // - Spark uses a different default stddev (sample instead of pop)
+    // - Tiny numerical differences in stddev results.
+    // - Different StdDev behavior when n=1 (NaN instead of 0)
+    checkAnswer(sql(s"""
+      |select  p_mfgr,p_name, p_size,
+      |rank() over(distribute by p_mfgr sort by p_name) as r,
+      |dense_rank() over(distribute by p_mfgr sort by p_name) as dr,
+      |cume_dist() over(distribute by p_mfgr sort by p_name) as cud,
+      |percent_rank() over(distribute by p_mfgr sort by p_name) as pr,
+      |ntile(3) over(distribute by p_mfgr sort by p_name) as nt,
+      |count(p_size) over(distribute by p_mfgr sort by p_name) as ca,
+      |avg(p_size) over(distribute by p_mfgr sort by p_name) as avg,
+      |stddev(p_size) over(distribute by p_mfgr sort by p_name) as st,
+      |first_value(p_size % 5) over(distribute by p_mfgr sort by p_name) as fv,
+      |last_value(p_size) over(distribute by p_mfgr sort by p_name) as lv,
+      |first_value(p_size) over w1  as fvW1
+      |from part
+      |window w1 as (distribute by p_mfgr sort by p_mfgr, p_name
+      |             rows between 2 preceding and 2 following)
+      """.stripMargin),
+      // scalastyle:off
+      Seq(
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 1, 1, 0.3333333333333333, 0.0, 1, 2, 2.0, 0.0, 2, 2, 2),
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 1, 1, 0.3333333333333333, 0.0, 1, 2, 2.0, 0.0, 2, 2, 2),
+        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 3, 2, 0.5, 0.4, 2, 3, 12.666666666666666, 18.475208614068027, 2, 34, 2),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 4, 3, 0.6666666666666666, 0.6, 2, 4, 11.0, 15.448840301675292, 2, 6, 2),
+        Row("Manufacturer#1", "almond aquamarine burnished black steel", 28, 5, 4, 0.8333333333333334, 0.8, 3, 5, 14.4, 15.388307249337076, 2, 28, 34),
+        Row("Manufacturer#1", "almond aquamarine pink moccasin thistle", 42, 6, 5, 1.0, 1.0, 3, 6, 19.0, 17.787636155487327, 2, 42, 6),
+        Row("Manufacturer#2", "almond antique violet chocolate turquoise", 14, 1, 1, 0.2, 0.0, 1, 1, 14.0, Double.NaN, 4, 14, 14),
+        Row("Manufacturer#2", "almond antique violet turquoise frosted", 40, 2, 2, 0.4, 0.25, 1, 2, 27.0, 18.384776310850235, 4, 40, 14),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 3, 3, 0.6, 0.5, 2, 3, 18.666666666666668, 19.42506971244462, 4, 2, 14),
+        Row("Manufacturer#2", "almond aquamarine rose maroon antique", 25, 4, 4, 0.8, 0.75, 2, 4, 20.25, 16.17353805861084, 4, 25, 40),
+        Row("Manufacturer#2", "almond aquamarine sandy cyan gainsboro", 18, 5, 5, 1.0, 1.0, 3, 5, 19.8, 14.042791745233567, 4, 18, 2),
+        Row("Manufacturer#3", "almond antique chartreuse khaki white", 17, 1, 1, 0.2, 0.0, 1, 1, 17.0,Double.NaN, 2, 17, 17),
+        Row("Manufacturer#3", "almond antique forest lavender goldenrod", 14, 2, 2, 0.4, 0.25, 1, 2, 15.5, 2.1213203435596424, 2, 14, 17),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 3, 3, 0.6, 0.5, 2, 3, 16.666666666666668, 2.516611478423583, 2, 19, 17),
+        Row("Manufacturer#3", "almond antique misty red olive", 1, 4, 4, 0.8, 0.75, 2, 4, 12.75, 8.098353742170895, 2, 1, 14),
+        Row("Manufacturer#3", "almond antique olive coral navajo", 45, 5, 5, 1.0, 1.0, 3, 5, 19.2, 16.037456157383566, 2, 45, 19),
+        Row("Manufacturer#4", "almond antique gainsboro frosted violet", 10, 1, 1, 0.2, 0.0, 1, 1, 10.0, Double.NaN, 0, 10, 10),
+        Row("Manufacturer#4", "almond antique violet mint lemon", 39, 2, 2, 0.4, 0.25, 1, 2, 24.5, 20.506096654409877, 0, 39, 10),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 3, 3, 0.6, 0.5, 2, 3, 25.333333333333332, 14.571661996262929, 0, 27, 10),
+        Row("Manufacturer#4", "almond aquamarine yellow dodger mint", 7, 4, 4, 0.8, 0.75, 2, 4, 20.75, 15.01943185787443, 0, 7, 39),
+        Row("Manufacturer#4", "almond azure aquamarine papaya violet", 12, 5, 5, 1.0, 1.0, 3, 5, 19.0, 13.583077707206124, 0, 12, 27),
+        Row("Manufacturer#5", "almond antique blue firebrick mint", 31, 1, 1, 0.2, 0.0, 1, 1, 31.0, Double.NaN, 1, 31, 31),
+        Row("Manufacturer#5", "almond antique medium spring khaki", 6, 2, 2, 0.4, 0.25, 1, 2, 18.5, 17.67766952966369, 1, 6, 31),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 3, 3, 0.6, 0.5, 2, 3, 13.0, 15.716233645501712, 1, 2, 31),
+        Row("Manufacturer#5", "almond aquamarine dodger light gainsboro", 46, 4, 4, 0.8, 0.75, 2, 4, 21.25, 20.902551678363736, 1, 46, 6),
+        Row("Manufacturer#5", "almond azure blanched chiffon midnight", 23, 5, 5, 1.0, 1.0, 3, 5, 21.6, 18.1190507477627, 1, 23, 2)))
+      // scalastyle:on
+  }
+
+  test("windowing.q -- 20. testSTATs") {
+    // Moved because:
+    // - Spark uses a different default stddev/variance (sample instead of pop)
+    // - Tiny numerical differences in aggregation results.
+    checkAnswer(sql("""
+      |select p_mfgr,p_name, p_size, sdev, sdev_pop, uniq_data, var, cor, covarp
+      |from (
+      |select  p_mfgr,p_name, p_size,
+      |stddev_pop(p_retailprice) over w1 as sdev,
+      |stddev_pop(p_retailprice) over w1 as sdev_pop,
+      |collect_set(p_size) over w1 as uniq_size,
+      |var_pop(p_retailprice) over w1 as var,
+      |corr(p_size, p_retailprice) over w1 as cor,
+      |covar_pop(p_size, p_retailprice) over w1 as covarp
+      |from part
+      |window w1 as (distribute by p_mfgr sort by p_mfgr, p_name
+      |             rows between 2 preceding and 2 following)
+      |) t lateral view explode(uniq_size) d as uniq_data
+      |order by p_mfgr,p_name, p_size, sdev, sdev_pop, uniq_data, var, cor, covarp
+      """.stripMargin),
+      // scalastyle:off
+      Seq(
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 258.10677784349247, 258.10677784349247, 2, 66619.10876874997, 0.811328754177887, 2801.7074999999995),
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 258.10677784349247, 258.10677784349247, 6, 66619.10876874997, 0.811328754177887, 2801.7074999999995),
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 258.10677784349247, 258.10677784349247, 34, 66619.10876874997, 0.811328754177887, 2801.7074999999995),
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 2, 74912.88268888886, 1.0, 4128.782222222221),
+        Row("Manufacturer#1", "almond antique burnished rose metallic", 2, 273.70217881648085, 273.70217881648085, 34, 74912.88268888886, 1.0, 4128.782222222221),
+        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 2, 53315.510023999974, 0.6956393773976641, 2210.7864),
+        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 6, 53315.510023999974, 0.6956393773976641, 2210.7864),
+        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 28, 53315.510023999974, 0.6956393773976641, 2210.7864),
+        Row("Manufacturer#1", "almond antique chartreuse lavender yellow", 34, 230.9015158547037, 230.9015158547037, 34, 53315.510023999974, 0.6956393773976641, 2210.7864),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 202.73109328368943, 202.73109328368943, 2, 41099.89618399999, 0.6307859771012139, 2009.9536000000007),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 202.73109328368943, 202.73109328368943, 6, 41099.89618399999, 0.6307859771012139, 2009.9536000000007),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 202.73109328368943, 202.73109328368943, 28, 41099.89618399999, 0.6307859771012139, 2009.9536000000007),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 202.73109328368943, 202.73109328368943, 34, 41099.89618399999, 0.6307859771012139, 2009.9536000000007),
+        Row("Manufacturer#1", "almond antique salmon chartreuse burlywood", 6, 202.73109328368943, 202.73109328368943, 42, 41099.89618399999, 0.6307859771012139, 2009.9536000000007),
+        Row("Manufacturer#1", "almond aquamarine burnished black steel", 28, 121.60645179738611, 121.60645179738611, 6, 14788.129118749992, 0.2036684720435979, 331.1337500000004),
+        Row("Manufacturer#1", "almond aquamarine burnished black steel", 28, 121.60645179738611, 121.60645179738611, 28, 14788.129118749992, 0.2036684720435979, 331.1337500000004),
+        Row("Manufacturer#1", "almond aquamarine burnished black steel", 28, 121.60645179738611, 121.60645179738611, 34, 14788.129118749992, 0.2036684720435979, 331.1337500000004),
+        Row("Manufacturer#1", "almond aquamarine burnished black steel", 28, 121.60645179738611, 121.60645179738611, 42, 14788.129118749992, 0.2036684720435979, 331.1337500000004),
+        Row("Manufacturer#1", "almond aquamarine pink moccasin thistle", 42, 96.57515864168516, 96.57515864168516, 6, 9326.761266666656, -1.4442181184933883E-4, -0.20666666666708502),
+        Row("Manufacturer#1", "almond aquamarine pink moccasin thistle", 42, 96.57515864168516, 96.57515864168516, 28, 9326.761266666656, -1.4442181184933883E-4, -0.20666666666708502),
+        Row("Manufacturer#1", "almond aquamarine pink moccasin thistle", 42, 96.57515864168516, 96.57515864168516, 42, 9326.761266666656, -1.4442181184933883E-4, -0.20666666666708502),
+        Row("Manufacturer#2", "almond antique violet chocolate turquoise", 14, 142.23631697518977, 142.23631697518977, 2, 20231.16986666666, -0.4936952655452319, -1113.7466666666658),
+        Row("Manufacturer#2", "almond antique violet chocolate turquoise", 14, 142.23631697518977, 142.23631697518977, 14, 20231.16986666666, -0.4936952655452319, -1113.7466666666658),
+        Row("Manufacturer#2", "almond antique violet chocolate turquoise", 14, 142.23631697518977, 142.23631697518977, 40, 20231.16986666666, -0.4936952655452319, -1113.7466666666658),
+        Row("Manufacturer#2", "almond antique violet turquoise frosted", 40, 137.7630649884068, 137.7630649884068, 2, 18978.662074999997, -0.5205630897335946, -1004.4812499999995),
+        Row("Manufacturer#2", "almond antique violet turquoise frosted", 40, 137.7630649884068, 137.7630649884068, 14, 18978.662074999997, -0.5205630897335946, -1004.4812499999995),
+        Row("Manufacturer#2", "almond antique violet turquoise frosted", 40, 137.7630649884068, 137.7630649884068, 25, 18978.662074999997, -0.5205630897335946, -1004.4812499999995),
+        Row("Manufacturer#2", "almond antique violet turquoise frosted", 40, 137.7630649884068, 137.7630649884068, 40, 18978.662074999997, -0.5205630897335946, -1004.4812499999995),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 130.03972279269132, 130.03972279269132, 2, 16910.329504000005, -0.46908967495720255, -766.1791999999995),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 130.03972279269132, 130.03972279269132, 14, 16910.329504000005, -0.46908967495720255, -766.1791999999995),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 130.03972279269132, 130.03972279269132, 18, 16910.329504000005, -0.46908967495720255, -766.1791999999995),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 130.03972279269132, 130.03972279269132, 25, 16910.329504000005, -0.46908967495720255, -766.1791999999995),
+        Row("Manufacturer#2", "almond aquamarine midnight light salmon", 2, 130.03972279269132, 130.03972279269132, 40, 16910.329504000005, -0.46908967495720255, -766.1791999999995),
+        Row("Manufacturer#2", "almond aquamarine rose maroon antique", 25, 135.55100986344593, 135.55100986344593, 2, 18374.076275000018, -0.6091405874714462, -1128.1787499999987),
+        Row("Manufacturer#2", "almond aquamarine rose maroon antique", 25, 135.55100986344593, 135.55100986344593, 18, 18374.076275000018, -0.6091405874714462, -1128.1787499999987),
+        Row("Manufacturer#2", "almond aquamarine rose maroon antique", 25, 135.55100986344593, 135.55100986344593, 25, 18374.076275000018, -0.6091405874714462, -1128.1787499999987),
+        Row("Manufacturer#2", "almond aquamarine rose maroon antique", 25, 135.55100986344593, 135.55100986344593, 40, 18374.076275000018, -0.6091405874714462, -1128.1787499999987),
+        Row("Manufacturer#2", "almond aquamarine sandy cyan gainsboro", 18, 156.44019460768035, 156.44019460768035, 2, 24473.534488888898, -0.9571686373491605, -1441.4466666666676),
+        Row("Manufacturer#2", "almond aquamarine sandy cyan gainsboro", 18, 156.44019460768035, 156.44019460768035, 18, 24473.534488888898, -0.9571686373491605, -1441.4466666666676),
+        Row("Manufacturer#2", "almond aquamarine sandy cyan gainsboro", 18, 156.44019460768035, 156.44019460768035, 25, 24473.534488888898, -0.9571686373491605, -1441.4466666666676),
+        Row("Manufacturer#3", "almond antique chartreuse khaki white", 17, 196.77422668858057, 196.77422668858057, 14, 38720.0962888889, 0.5557168646224995, 224.6944444444446),
+        Row("Manufacturer#3", "almond antique chartreuse khaki white", 17, 196.77422668858057, 196.77422668858057, 17, 38720.0962888889, 0.5557168646224995, 224.6944444444446),
+        Row("Manufacturer#3", "almond antique chartreuse khaki white", 17, 196.77422668858057, 196.77422668858057, 19, 38720.0962888889, 0.5557168646224995, 224.6944444444446),
+        Row("Manufacturer#3", "almond antique forest lavender goldenrod", 14, 275.1414418985261, 275.1414418985261, 1, 75702.81305000003, -0.6720833036576083, -1296.9000000000003),
+        Row("Manufacturer#3", "almond antique forest lavender goldenrod", 14, 275.1414418985261, 275.1414418985261, 14, 75702.81305000003, -0.6720833036576083, -1296.9000000000003),
+        Row("Manufacturer#3", "almond antique forest lavender goldenrod", 14, 275.1414418985261, 275.1414418985261, 17, 75702.81305000003, -0.6720833036576083, -1296.9000000000003),
+        Row("Manufacturer#3", "almond antique forest lavender goldenrod", 14, 275.1414418985261, 275.1414418985261, 19, 75702.81305000003, -0.6720833036576083, -1296.9000000000003),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 260.23473614412046, 260.23473614412046, 1, 67722.11789600001, -0.5703526513979519, -2129.0664),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 260.23473614412046, 260.23473614412046, 14, 67722.11789600001, -0.5703526513979519, -2129.0664),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 260.23473614412046, 260.23473614412046, 17, 67722.11789600001, -0.5703526513979519, -2129.0664),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 260.23473614412046, 260.23473614412046, 19, 67722.11789600001, -0.5703526513979519, -2129.0664),
+        Row("Manufacturer#3", "almond antique metallic orange dim", 19, 260.23473614412046, 260.23473614412046, 45, 67722.11789600001, -0.5703526513979519, -2129.0664),
+        Row("Manufacturer#3", "almond antique misty red olive", 1, 275.913996235693, 275.913996235693, 1, 76128.53331875002, -0.5774768996448021, -2547.7868749999993),
+        Row("Manufacturer#3", "almond antique misty red olive", 1, 275.913996235693, 275.913996235693, 14, 76128.53331875002, -0.5774768996448021, -2547.7868749999993),
+        Row("Manufacturer#3", "almond antique misty red olive", 1, 275.913996235693, 275.913996235693, 19, 76128.53331875002, -0.5774768996448021, -2547.7868749999993),
+        Row("Manufacturer#3", "almond antique misty red olive", 1, 275.913996235693, 275.913996235693, 45, 76128.53331875002, -0.5774768996448021, -2547.7868749999993),
+        Row("Manufacturer#3", "almond antique olive coral navajo", 45, 260.58159187137954, 260.58159187137954, 1, 67902.7660222222, -0.8710736366736884, -4099.731111111111),
+        Row("Manufacturer#3", "almond antique olive coral navajo", 45, 260.58159187137954, 260.58159187137954, 19, 67902.7660222222, -0.8710736366736884, -4099.731111111111),
+        Row("Manufacturer#3", "almond antique olive coral navajo", 45, 260.58159187137954, 260.58159187137954, 45, 67902.7660222222, -0.8710736366736884, -4099.731111111111),
+        Row("Manufacturer#4", "almond antique gainsboro frosted violet", 10, 170.1301188959661, 170.1301188959661, 10, 28944.25735555556, -0.6656975320098423, -1347.4777777777779),
+        Row("Manufacturer#4", "almond antique gainsboro frosted violet", 10, 170.1301188959661, 170.1301188959661, 27, 28944.25735555556, -0.6656975320098423, -1347.4777777777779),
+        Row("Manufacturer#4", "almond antique gainsboro frosted violet", 10, 170.1301188959661, 170.1301188959661, 39, 28944.25735555556, -0.6656975320098423, -1347.4777777777779),
+        Row("Manufacturer#4", "almond antique violet mint lemon", 39, 242.26834609323197, 242.26834609323197, 7, 58693.95151875002, -0.8051852719193339, -2537.328125),
+        Row("Manufacturer#4", "almond antique violet mint lemon", 39, 242.26834609323197, 242.26834609323197, 10, 58693.95151875002, -0.8051852719193339, -2537.328125),
+        Row("Manufacturer#4", "almond antique violet mint lemon", 39, 242.26834609323197, 242.26834609323197, 27, 58693.95151875002, -0.8051852719193339, -2537.328125),
+        Row("Manufacturer#4", "almond antique violet mint lemon", 39, 242.26834609323197, 242.26834609323197, 39, 58693.95151875002, -0.8051852719193339, -2537.328125),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 234.10001662537323, 234.10001662537323, 7, 54802.81778400003, -0.6046935574240581, -1719.8079999999995),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 234.10001662537323, 234.10001662537323, 10, 54802.81778400003, -0.6046935574240581, -1719.8079999999995),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 234.10001662537323, 234.10001662537323, 12, 54802.81778400003, -0.6046935574240581, -1719.8079999999995),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 234.10001662537323, 234.10001662537323, 27, 54802.81778400003, -0.6046935574240581, -1719.8079999999995),
+        Row("Manufacturer#4", "almond aquamarine floral ivory bisque", 27, 234.10001662537323, 234.10001662537323, 39, 54802.81778400003, -0.6046935574240581, -1719.8079999999995),
+        Row("Manufacturer#4", "almond aquamarine yellow dodger mint", 7, 247.33427141977316, 247.33427141977316, 7, 61174.241818750015, -0.5508665654707869, -1719.0368749999975),
+        Row("Manufacturer#4", "almond aquamarine yellow dodger mint", 7, 247.33427141977316, 247.33427141977316, 12, 61174.241818750015, -0.5508665654707869, -1719.0368749999975),
+        Row("Manufacturer#4", "almond aquamarine yellow dodger mint", 7, 247.33427141977316, 247.33427141977316, 27, 61174.241818750015, -0.5508665654707869, -1719.0368749999975),
+        Row("Manufacturer#4", "almond aquamarine yellow dodger mint", 7, 247.33427141977316, 247.33427141977316, 39, 61174.241818750015, -0.5508665654707869, -1719.0368749999975),
+        Row("Manufacturer#4", "almond azure aquamarine papaya violet", 12, 283.33443305668936, 283.33443305668936, 7, 80278.4009555556, -0.7755740084632333, -1867.4888888888881),
+        Row("Manufacturer#4", "almond azure aquamarine papaya violet", 12, 283.33443305668936, 283.33443305668936, 12, 80278.4009555556, -0.7755740084632333, -1867.4888888888881),
+        Row("Manufacturer#4", "almond azure aquamarine papaya violet", 12, 283.33443305668936, 283.33443305668936, 27, 80278.4009555556, -0.7755740084632333, -1867.4888888888881),
+        Row("Manufacturer#5", "almond antique blue firebrick mint", 31, 83.69879024746344, 83.69879024746344, 2, 7005.487488888881, 0.3900430308728505, 418.9233333333353),
+        Row("Manufacturer#5", "almond antique blue firebrick mint", 31, 83.69879024746344, 83.69879024746344, 6, 7005.487488888881, 0.3900430308728505, 418.9233333333353),
+        Row("Manufacturer#5", "almond antique blue firebrick mint", 31, 83.69879024746344, 83.69879024746344, 31, 7005.487488888881, 0.3900430308728505, 418.9233333333353),
+        Row("Manufacturer#5", "almond antique medium spring khaki", 6, 316.68049612345885, 316.68049612345885, 2, 100286.53662500005, -0.7136129117761831, -4090.853749999999),
+        Row("Manufacturer#5", "almond antique medium spring khaki", 6, 316.68049612345885, 316.68049612345885, 6, 100286.53662500005, -0.7136129117761831, -4090.853749999999),
+        Row("Manufacturer#5", "almond antique medium spring khaki", 6, 316.68049612345885, 316.68049612345885, 31, 100286.53662500005, -0.7136129117761831, -4090.853749999999),
+        Row("Manufacturer#5", "almond antique medium spring khaki", 6, 316.68049612345885, 316.68049612345885, 46, 100286.53662500005, -0.7136129117761831, -4090.853749999999),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 285.4050629824216, 285.4050629824216, 2, 81456.04997600004, -0.712858514567818, -3297.2011999999986),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 285.4050629824216, 285.4050629824216, 6, 81456.04997600004, -0.712858514567818, -3297.2011999999986),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 285.4050629824216, 285.4050629824216, 23, 81456.04997600004, -0.712858514567818, -3297.2011999999986),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 285.4050629824216, 285.4050629824216, 31, 81456.04997600004, -0.712858514567818, -3297.2011999999986),
+        Row("Manufacturer#5", "almond antique sky peru orange", 2, 285.4050629824216, 285.4050629824216, 46, 81456.04997600004, -0.712858514567818, -3297.2011999999986),
+        Row("Manufacturer#5", "almond aquamarine dodger light gainsboro", 46, 285.43749038756283, 285.43749038756283, 2, 81474.56091875004, -0.9841287871533909, -4871.028125000002),
+        Row("Manufacturer#5", "almond aquamarine dodger light gainsboro", 46, 285.43749038756283, 285.43749038756283, 6, 81474.56091875004, -0.9841287871533909, -4871.028125000002),
+        Row("Manufacturer#5", "almond aquamarine dodger light gainsboro", 46, 285.43749038756283, 285.43749038756283, 23, 81474.56091875004, -0.9841287871533909, -4871.028125000002),
+        Row("Manufacturer#5", "almond aquamarine dodger light gainsboro", 46, 285.43749038756283, 285.43749038756283, 46, 81474.56091875004, -0.9841287871533909, -4871.028125000002),
+        Row("Manufacturer#5", "almond azure blanched chiffon midnight", 23, 315.9225931564038, 315.9225931564038, 2, 99807.08486666666, -0.9978877469246935, -5664.856666666666),
+        Row("Manufacturer#5", "almond azure blanched chiffon midnight", 23, 315.9225931564038, 315.9225931564038, 23, 99807.08486666666, -0.9978877469246935, -5664.856666666666),
+        Row("Manufacturer#5", "almond azure blanched chiffon midnight", 23, 315.9225931564038, 315.9225931564038, 46, 99807.08486666666, -0.9978877469246935, -5664.856666666666)))
+      // scalastyle:on
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-8641][SQL] Native Spark Window functions

Posted by yh...@apache.org.
[SPARK-8641][SQL] Native Spark Window functions

This PR removes Hive windows functions from Spark and replaces them with (native) Spark ones. The PR is on par with Hive in terms of features.

This has the following advantages:
* Better memory management.
* The ability to use spark UDAFs in Window functions.

cc rxin / yhuai

Author: Herman van Hovell <hv...@questtec.nl>

Closes #9819 from hvanhovell/SPARK-8641-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/658f66e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/658f66e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/658f66e6

Branch: refs/heads/master
Commit: 658f66e6208a52367e3b43a6fee9c90f33fb6226
Parents: ed6ebda
Author: Herman van Hovell <hv...@questtec.nl>
Authored: Thu Dec 17 15:16:35 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Dec 17 15:16:35 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  81 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  39 +-
 .../catalyst/analysis/FunctionRegistry.scala    |  12 +-
 .../expressions/aggregate/interfaces.scala      |   7 +-
 .../expressions/windowExpressions.scala         | 318 +++++++--
 .../catalyst/analysis/AnalysisErrorSuite.scala  |  31 +-
 .../org/apache/spark/sql/execution/Window.scala | 649 +++++++++++--------
 .../spark/sql/expressions/WindowSpec.scala      |  54 +-
 .../scala/org/apache/spark/sql/functions.scala  |  16 +-
 .../apache/spark/sql/DataFrameWindowSuite.scala | 295 +++++++++
 .../HiveWindowFunctionQuerySuite.scala          |  10 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   1 -
 .../org/apache/spark/sql/hive/HiveQl.scala      |  22 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    | 224 -------
 .../sql/hive/HiveDataFrameWindowSuite.scala     | 259 --------
 .../sql/hive/execution/WindowQuerySuite.scala   | 230 +++++++
 16 files changed, 1325 insertions(+), 923 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ca00a5e..64dd83a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -77,6 +77,8 @@ class Analyzer(
       ResolveGenerate ::
       ResolveFunctions ::
       ResolveAliases ::
+      ResolveWindowOrder ::
+      ResolveWindowFrame ::
       ExtractWindowExpressions ::
       GlobalAggregates ::
       ResolveAggregateFunctions ::
@@ -127,14 +129,12 @@ class Analyzer(
       // Lookup WindowSpecDefinitions. This rule works with unresolved children.
       case WithWindowDefinition(windowDefinitions, child) =>
         child.transform {
-          case plan => plan.transformExpressions {
+          case p => p.transformExpressions {
             case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
               val errorMessage =
                 s"Window specification $windowName is not defined in the WINDOW clause."
               val windowSpecDefinition =
-                windowDefinitions
-                  .get(windowName)
-                  .getOrElse(failAnalysis(errorMessage))
+                windowDefinitions.getOrElse(windowName, failAnalysis(errorMessage))
               WindowExpression(c, windowSpecDefinition)
           }
         }
@@ -577,6 +577,10 @@ class Analyzer(
                   AggregateExpression(max, Complete, isDistinct = false)
                 case min: Min if isDistinct =>
                   AggregateExpression(min, Complete, isDistinct = false)
+                // AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
+                // the context of a Window clause. They do not need to be wrapped in an
+                // AggregateExpression.
+                case wf: AggregateWindowFunction => wf
                 // We get an aggregate function, we need to wrap it in an AggregateExpression.
                 case agg: AggregateFunction => AggregateExpression(agg, Complete, isDistinct)
                 // This function is not an aggregate function, just return the resolved one.
@@ -597,11 +601,17 @@ class Analyzer(
     }
 
     def containsAggregates(exprs: Seq[Expression]): Boolean = {
-      exprs.foreach(_.foreach {
-        case agg: AggregateExpression => return true
-        case _ =>
-      })
-      false
+      // Collect all Windowed Aggregate Expressions.
+      val windowedAggExprs = exprs.flatMap { expr =>
+        expr.collect {
+          case WindowExpression(ae: AggregateExpression, _) => ae
+        }
+      }.toSet
+
+      // Find the first Aggregate Expression that is not Windowed.
+      exprs.exists(_.collectFirst {
+        case ae: AggregateExpression if !windowedAggExprs.contains(ae) => ae
+      }.isDefined)
     }
   }
 
@@ -875,26 +885,37 @@ class Analyzer(
 
       // Now, we extract regular expressions from expressionsWithWindowFunctions
       // by using extractExpr.
+      val seenWindowAggregates = new ArrayBuffer[AggregateExpression]
       val newExpressionsWithWindowFunctions = expressionsWithWindowFunctions.map {
         _.transform {
           // Extracts children expressions of a WindowFunction (input parameters of
           // a WindowFunction).
           case wf : WindowFunction =>
-            val newChildren = wf.children.map(extractExpr(_))
+            val newChildren = wf.children.map(extractExpr)
             wf.withNewChildren(newChildren)
 
           // Extracts expressions from the partition spec and order spec.
           case wsc @ WindowSpecDefinition(partitionSpec, orderSpec, _) =>
-            val newPartitionSpec = partitionSpec.map(extractExpr(_))
+            val newPartitionSpec = partitionSpec.map(extractExpr)
             val newOrderSpec = orderSpec.map { so =>
               val newChild = extractExpr(so.child)
               so.copy(child = newChild)
             }
             wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec)
 
+          // Extract Windowed AggregateExpression
+          case we @ WindowExpression(
+              AggregateExpression(function, mode, isDistinct),
+              spec: WindowSpecDefinition) =>
+            val newChildren = function.children.map(extractExpr)
+            val newFunction = function.withNewChildren(newChildren).asInstanceOf[AggregateFunction]
+            val newAgg = AggregateExpression(newFunction, mode, isDistinct)
+            seenWindowAggregates += newAgg
+            WindowExpression(newAgg, spec)
+
           // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) OVER (...),
           // we need to extract SUM(x).
-          case agg: AggregateExpression =>
+          case agg: AggregateExpression if !seenWindowAggregates.contains(agg) =>
             val withName = Alias(agg, s"_w${extractedExprBuffer.length}")()
             extractedExprBuffer += withName
             withName.toAttribute
@@ -1102,6 +1123,42 @@ class Analyzer(
       }
     }
   }
+
+  /**
+   * Check and add proper window frames for all window functions.
+   */
+  object ResolveWindowFrame extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case logical: LogicalPlan => logical transformExpressions {
+        case WindowExpression(wf: WindowFunction,
+        WindowSpecDefinition(_, _, f: SpecifiedWindowFrame))
+          if wf.frame != UnspecifiedFrame && wf.frame != f =>
+          failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}")
+        case WindowExpression(wf: WindowFunction,
+        s @ WindowSpecDefinition(_, o, UnspecifiedFrame))
+          if wf.frame != UnspecifiedFrame =>
+          WindowExpression(wf, s.copy(frameSpecification = wf.frame))
+        case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o, UnspecifiedFrame)) =>
+          val frame = SpecifiedWindowFrame.defaultWindowFrame(o.nonEmpty, acceptWindowFrame = true)
+          we.copy(windowSpec = s.copy(frameSpecification = frame))
+      }
+    }
+  }
+
+  /**
+    * Check and add order to [[AggregateWindowFunction]]s.
+    */
+  object ResolveWindowOrder extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+      case logical: LogicalPlan => logical transformExpressions {
+        case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty =>
+          failAnalysis(s"WindowFunction $wf requires window to be ordered")
+        case WindowExpression(rank: RankLike, spec) if spec.resolved =>
+          val order = spec.orderSpec.map(_.child)
+          WindowExpression(rank.withOrder(order), spec)
+      }
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7b2c93d..440f679 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -70,15 +70,32 @@ trait CheckAnalysis {
             failAnalysis(
               s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}")
 
-          case WindowExpression(UnresolvedWindowFunction(name, _), _) =>
-            failAnalysis(
-              s"Could not resolve window function '$name'. " +
-              "Note that, using window functions currently requires a HiveContext")
+          case w @ WindowExpression(AggregateExpression(_, _, true), _) =>
+            failAnalysis(s"Distinct window functions are not supported: $w")
+
+          case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
+               SpecifiedWindowFrame(frame,
+                 FrameBoundary(l),
+                 FrameBoundary(h))))
+             if order.isEmpty || frame != RowFrame || l != h =>
+            failAnalysis("An offset window function can only be evaluated in an ordered " +
+              s"row-based window frame with a single offset: $w")
+
+          case w @ WindowExpression(e, s) =>
+            // Only allow window functions with an aggregate expression or an offset window
+            // function.
+            e match {
+              case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
+              case _ =>
+                failAnalysis(s"Expression '$e' not supported within a window function.")
+            }
+            // Make sure the window specification is valid.
+            s.validate match {
+              case Some(m) =>
+                failAnalysis(s"Window specification $s is not valid because $m")
+              case None => w
+            }
 
-          case w @ WindowExpression(windowFunction, windowSpec) if windowSpec.validate.nonEmpty =>
-            // The window spec is not valid.
-            val reason = windowSpec.validate.get
-            failAnalysis(s"Window specification $windowSpec is not valid because $reason")
         }
 
         operator match {
@@ -204,10 +221,12 @@ trait CheckAnalysis {
               s"unresolved operator ${operator.simpleString}")
 
           case o if o.expressions.exists(!_.deterministic) &&
-            !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] & !o.isInstanceOf[Aggregate] =>
+            !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
+            !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] =>
             // The rule above is used to check Aggregate operator.
             failAnalysis(
-              s"""nondeterministic expressions are only allowed in Project or Filter, found:
+              s"""nondeterministic expressions are only allowed in
+                 |Project, Filter, Aggregate or Window, found:
                  | ${o.expressions.map(_.prettyString).mkString(",")}
                  |in operator ${operator.simpleString}
              """.stripMargin)

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index f9c04d7..12c24cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -283,7 +283,17 @@ object FunctionRegistry {
     expression[Sha2]("sha2"),
     expression[SparkPartitionID]("spark_partition_id"),
     expression[InputFileName]("input_file_name"),
-    expression[MonotonicallyIncreasingID]("monotonically_increasing_id")
+    expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
+
+    // window functions
+    expression[Lead]("lead"),
+    expression[Lag]("lag"),
+    expression[RowNumber]("row_number"),
+    expression[CumeDist]("cume_dist"),
+    expression[NTile]("ntile"),
+    expression[Rank]("rank"),
+    expression[DenseRank]("dense_rank"),
+    expression[PercentRank]("percent_rank")
   )
 
   val builtin: SimpleFunctionRegistry = {

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 3b441de..b6d2ddc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenFallback, GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
 
@@ -144,9 +144,6 @@ sealed abstract class AggregateFunction extends Expression with ImplicitCastInpu
    */
   def defaultResult: Option[Literal] = None
 
-  override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String =
-    throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
-
   /**
    * Wraps this [[AggregateFunction]] in an [[AggregateExpression]] because
    * [[AggregateExpression]] is the container of an [[AggregateFunction]], aggregation mode,
@@ -187,7 +184,7 @@ sealed abstract class AggregateFunction extends Expression with ImplicitCastInpu
  * `inputAggBufferOffset`, but not on the correctness of the attribute ids in `aggBufferAttributes`
  * and `inputAggBufferAttributes`.
  */
-abstract class ImperativeAggregate extends AggregateFunction {
+abstract class ImperativeAggregate extends AggregateFunction with CodegenFallback {
 
   /**
    * The offset of this function's first buffer value in the underlying shared mutable aggregation

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 1680aa8..06252ac 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
-import org.apache.spark.sql.types.{DataType, NumericType}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{NoOp, DeclarativeAggregate}
+import org.apache.spark.sql.types._
 
 /**
  * The trait of the Window Specification (specified in the OVER clause or WINDOW clause) for
@@ -117,6 +118,19 @@ sealed trait FrameBoundary {
   def notFollows(other: FrameBoundary): Boolean
 }
 
+/**
+ * Extractor for making working with frame boundaries easier.
+ */
+object FrameBoundary {
+  def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
+  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
+    case CurrentRow => Some(0)
+    case ValuePreceding(offset) => Some(-offset)
+    case ValueFollowing(offset) => Some(offset)
+    case _ => None
+  }
+}
+
 /** UNBOUNDED PRECEDING boundary. */
 case object UnboundedPreceding extends FrameBoundary {
   def notFollows(other: FrameBoundary): Boolean = other match {
@@ -243,85 +257,281 @@ object SpecifiedWindowFrame {
   }
 }
 
+case class UnresolvedWindowExpression(
+    child: Expression,
+    windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable {
+
+  override def dataType: DataType = throw new UnresolvedException(this, "dataType")
+  override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
+  override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
+  override lazy val resolved = false
+}
+
+case class WindowExpression(
+    windowFunction: Expression,
+    windowSpec: WindowSpecDefinition) extends Expression with Unevaluable {
+
+  override def children: Seq[Expression] = windowFunction :: windowSpec :: Nil
+
+  override def dataType: DataType = windowFunction.dataType
+  override def foldable: Boolean = windowFunction.foldable
+  override def nullable: Boolean = windowFunction.nullable
+
+  override def toString: String = s"$windowFunction $windowSpec"
+}
+
 /**
- * Every window function needs to maintain a output buffer for its output.
- * It should expect that for a n-row window frame, it will be called n times
- * to retrieve value corresponding with these n rows.
+ * A window function is a function that can only be evaluated in the context of a window operator.
  */
 trait WindowFunction extends Expression {
-  def init(): Unit
+  /** Frame in which the window operator must be executed. */
+  def frame: WindowFrame = UnspecifiedFrame
+}
+
+/**
+ * An offset window function is a window function that returns the value of the input column offset
+ * by a number of rows within the partition. For instance: an OffsetWindowfunction for value x with
+ * offset -2, will get the value of x 2 rows back in the partition.
+ */
+abstract class OffsetWindowFunction
+  extends Expression with WindowFunction with Unevaluable with ImplicitCastInputTypes {
+  /**
+   * Input expression to evaluate against a row which a number of rows below or above (depending on
+   * the value and sign of the offset) the current row.
+   */
+  val input: Expression
+
+  /**
+   * Default result value for the function when the input expression returns NULL. The default will
+   * evaluated against the current row instead of the offset row.
+   */
+  val default: Expression
 
-  def reset(): Unit
+  /**
+   * (Foldable) expression that contains the number of rows between the current row and the row
+   * where the input expression is evaluated.
+   */
+  val offset: Expression
 
-  def prepareInputParameters(input: InternalRow): AnyRef
+  /**
+   * Direction (above = 1/below = -1) of the number of rows between the current row and the row
+   * where the input expression is evaluated.
+   */
+  val direction: SortDirection
 
-  def update(input: AnyRef): Unit
+  override def children: Seq[Expression] = Seq(input, offset, default)
 
-  def batchUpdate(inputs: Array[AnyRef]): Unit
+  /*
+   * The result of an OffsetWindowFunction is dependent on the frame in which the
+   * OffsetWindowFunction is executed, the input expression and the default expression. Even when
+   * both the input and the default expression are foldable, the result is still not foldable due to
+   * the frame.
+   */
+  override def foldable: Boolean = input.foldable && (default == null || default.foldable)
 
-  def evaluate(): Unit
+  override def nullable: Boolean = input.nullable && (default == null || default.nullable)
 
-  def get(index: Int): Any
+  override lazy val frame = {
+    // This will be triggered by the Analyzer.
+    val offsetValue = offset.eval() match {
+      case o: Int => o
+      case x => throw new AnalysisException(
+        s"Offset expression must be a foldable integer expression: $x")
+    }
+    val boundary = direction match {
+      case Ascending => ValueFollowing(offsetValue)
+      case Descending => ValuePreceding(offsetValue)
+    }
+    SpecifiedWindowFrame(RowFrame, boundary, boundary)
+  }
+
+  override def dataType: DataType = input.dataType
 
-  def newInstance(): WindowFunction
+  override def inputTypes: Seq[AbstractDataType] =
+    Seq(AnyDataType, IntegerType, TypeCollection(input.dataType, NullType))
+
+  override def toString: String = s"$prettyName($input, $offset, $default)"
 }
 
-case class UnresolvedWindowFunction(
-    name: String,
-    children: Seq[Expression])
-  extends Expression with WindowFunction with Unevaluable {
+case class Lead(input: Expression, offset: Expression, default: Expression)
+    extends OffsetWindowFunction {
 
-  override def dataType: DataType = throw new UnresolvedException(this, "dataType")
-  override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
-  override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
-  override lazy val resolved = false
+  def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))
 
-  override def init(): Unit = throw new UnresolvedException(this, "init")
-  override def reset(): Unit = throw new UnresolvedException(this, "reset")
-  override def prepareInputParameters(input: InternalRow): AnyRef =
-    throw new UnresolvedException(this, "prepareInputParameters")
-  override def update(input: AnyRef): Unit = throw new UnresolvedException(this, "update")
-  override def batchUpdate(inputs: Array[AnyRef]): Unit =
-    throw new UnresolvedException(this, "batchUpdate")
-  override def evaluate(): Unit = throw new UnresolvedException(this, "evaluate")
-  override def get(index: Int): Any = throw new UnresolvedException(this, "get")
+  def this(input: Expression) = this(input, Literal(1))
 
-  override def toString: String = s"'$name(${children.mkString(",")})"
+  def this() = this(Literal(null))
 
-  override def newInstance(): WindowFunction = throw new UnresolvedException(this, "newInstance")
+  override val direction = Ascending
 }
 
-case class UnresolvedWindowExpression(
-    child: UnresolvedWindowFunction,
-    windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable {
+case class Lag(input: Expression, offset: Expression, default: Expression)
+    extends OffsetWindowFunction {
 
-  override def dataType: DataType = throw new UnresolvedException(this, "dataType")
-  override def foldable: Boolean = throw new UnresolvedException(this, "foldable")
-  override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
-  override lazy val resolved = false
+  def this(input: Expression, offset: Expression) = this(input, offset, Literal(null))
+
+  def this(input: Expression) = this(input, Literal(1))
+
+  def this() = this(Literal(null))
+
+  override val direction = Descending
 }
 
-case class WindowExpression(
-    windowFunction: WindowFunction,
-    windowSpec: WindowSpecDefinition) extends Expression with Unevaluable {
+abstract class AggregateWindowFunction extends DeclarativeAggregate with WindowFunction {
+  self: Product =>
+  override val frame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)
+  override def dataType: DataType = IntegerType
+  override def nullable: Boolean = false
+  override def supportsPartial: Boolean = false
+  override lazy val mergeExpressions =
+    throw new UnsupportedOperationException("Window Functions do not support merging.")
+}
 
-  override def children: Seq[Expression] = windowFunction :: windowSpec :: Nil
+abstract class RowNumberLike extends AggregateWindowFunction {
+  override def children: Seq[Expression] = Nil
+  override def inputTypes: Seq[AbstractDataType] = Nil
+  protected val zero = Literal(0)
+  protected val one = Literal(1)
+  protected val rowNumber = AttributeReference("rowNumber", IntegerType, nullable = false)()
+  override val aggBufferAttributes: Seq[AttributeReference] = rowNumber :: Nil
+  override val initialValues: Seq[Expression] = zero :: Nil
+  override val updateExpressions: Seq[Expression] = Add(rowNumber, one) :: Nil
+}
 
-  override def dataType: DataType = windowFunction.dataType
-  override def foldable: Boolean = windowFunction.foldable
-  override def nullable: Boolean = windowFunction.nullable
+/**
+ * A [[SizeBasedWindowFunction]] needs the size of the current window for its calculation.
+ */
+trait SizeBasedWindowFunction extends AggregateWindowFunction {
+  protected def n: AttributeReference = SizeBasedWindowFunction.n
+}
 
-  override def toString: String = s"$windowFunction $windowSpec"
+object SizeBasedWindowFunction {
+  val n = AttributeReference("window__partition__size", IntegerType, nullable = false)()
+}
+
+case class RowNumber() extends RowNumberLike {
+  override val evaluateExpression = rowNumber
+}
+
+case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
+  override def dataType: DataType = DoubleType
+  // The frame for CUME_DIST is Range based instead of Row based, because CUME_DIST must
+  // return the same value for equal values in the partition.
+  override val frame = SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
+  override val evaluateExpression = Divide(Cast(rowNumber, DoubleType), Cast(n, DoubleType))
+}
+
+case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction {
+  def this() = this(Literal(1))
+
+  // Validate buckets. Note that this could be relaxed, the bucket value only needs to constant
+  // for each partition.
+  buckets.eval() match {
+    case b: Int if b > 0 => // Ok
+    case x => throw new AnalysisException(
+      "Buckets expression must be a foldable positive integer expression: $x")
+  }
+
+  private val bucket = AttributeReference("bucket", IntegerType, nullable = false)()
+  private val bucketThreshold =
+    AttributeReference("bucketThreshold", IntegerType, nullable = false)()
+  private val bucketSize = AttributeReference("bucketSize", IntegerType, nullable = false)()
+  private val bucketsWithPadding =
+    AttributeReference("bucketsWithPadding", IntegerType, nullable = false)()
+  private def bucketOverflow(e: Expression) =
+    If(GreaterThanOrEqual(rowNumber, bucketThreshold), e, zero)
+
+  override val aggBufferAttributes = Seq(
+    rowNumber,
+    bucket,
+    bucketThreshold,
+    bucketSize,
+    bucketsWithPadding
+  )
+
+  override val initialValues = Seq(
+    zero,
+    zero,
+    zero,
+    Cast(Divide(n, buckets), IntegerType),
+    Cast(Remainder(n, buckets), IntegerType)
+  )
+
+  override val updateExpressions = Seq(
+    Add(rowNumber, one),
+    Add(bucket, bucketOverflow(one)),
+    Add(bucketThreshold, bucketOverflow(
+      Add(bucketSize, If(LessThan(bucket, bucketsWithPadding), one, zero)))),
+    NoOp,
+    NoOp
+  )
+
+  override val evaluateExpression = bucket
 }
 
 /**
- * Extractor for making working with frame boundaries easier.
+ * A RankLike function is a WindowFunction that changes its value based on a change in the value of
+ * the order of the window in which is processed. For instance, when the value of 'x' changes in a
+ * window ordered by 'x' the rank function also changes. The size of the change of the rank function
+ * is (typically) not dependent on the size of the change in 'x'.
  */
-object FrameBoundaryExtractor {
-  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
-    case CurrentRow => Some(0)
-    case ValuePreceding(offset) => Some(-offset)
-    case ValueFollowing(offset) => Some(offset)
-    case _ => None
+abstract class RankLike extends AggregateWindowFunction {
+  override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType)
+
+  /** Store the values of the window 'order' expressions. */
+  protected val orderAttrs = children.map{ expr =>
+    AttributeReference(expr.prettyString, expr.dataType)()
   }
+
+  /** Predicate that detects if the order attributes have changed. */
+  protected val orderEquals = children.zip(orderAttrs)
+    .map(EqualNullSafe.tupled)
+    .reduceOption(And)
+    .getOrElse(Literal(true))
+
+  protected val orderInit = children.map(e => Literal.create(null, e.dataType))
+  protected val rank = AttributeReference("rank", IntegerType, nullable = false)()
+  protected val rowNumber = AttributeReference("rowNumber", IntegerType, nullable = false)()
+  protected val zero = Literal(0)
+  protected val one = Literal(1)
+  protected val increaseRowNumber = Add(rowNumber, one)
+
+  /**
+   * Different RankLike implementations use different source expressions to update their rank value.
+   * Rank for instance uses the number of rows seen, whereas DenseRank uses the number of changes.
+   */
+  protected def rankSource: Expression = rowNumber
+
+  /** Increase the rank when the current rank == 0 or when the one of order attributes changes. */
+  protected val increaseRank = If(And(orderEquals, Not(EqualTo(rank, zero))), rank, rankSource)
+
+  override val aggBufferAttributes: Seq[AttributeReference] = rank +: rowNumber +: orderAttrs
+  override val initialValues = zero +: one +: orderInit
+  override val updateExpressions = increaseRank +: increaseRowNumber +: children
+  override val evaluateExpression: Expression = rank
+
+  def withOrder(order: Seq[Expression]): RankLike
+}
+
+case class Rank(children: Seq[Expression]) extends RankLike {
+  def this() = this(Nil)
+  override def withOrder(order: Seq[Expression]): Rank = Rank(order)
+}
+
+case class DenseRank(children: Seq[Expression]) extends RankLike {
+  def this() = this(Nil)
+  override def withOrder(order: Seq[Expression]): DenseRank = DenseRank(order)
+  override protected def rankSource = Add(rank, one)
+  override val updateExpressions = increaseRank +: children
+  override val aggBufferAttributes = rank +: orderAttrs
+  override val initialValues = zero +: orderInit
+}
+
+case class PercentRank(children: Seq[Expression]) extends RankLike with SizeBasedWindowFunction {
+  def this() = this(Nil)
+  override def withOrder(order: Seq[Expression]): PercentRank = PercentRank(order)
+  override def dataType: DataType = DoubleType
+  override val evaluateExpression = If(GreaterThan(n, one),
+      Divide(Cast(Subtract(rank, one), DoubleType), Cast(Subtract(n, one), DoubleType)),
+      Literal(0.0d))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index ee43557..1207999 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Count, Sum, AggregateExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -133,17 +134,37 @@ class AnalysisErrorSuite extends AnalysisTest {
     "requires int type" :: "'null' is of date type" :: Nil)
 
   errorTest(
-    "unresolved window function",
+    "invalid window function",
     testRelation2.select(
       WindowExpression(
-        UnresolvedWindowFunction(
-          "lead",
-          UnresolvedAttribute("c") :: Nil),
+        Literal(0),
         WindowSpecDefinition(
           UnresolvedAttribute("a") :: Nil,
           SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
           UnspecifiedFrame)).as('window)),
-    "lead" :: "window functions currently requires a HiveContext" :: Nil)
+    "not supported within a window function" :: Nil)
+
+  errorTest(
+    "distinct window function",
+    testRelation2.select(
+      WindowExpression(
+        AggregateExpression(Count(UnresolvedAttribute("b")), Complete, isDistinct = true),
+        WindowSpecDefinition(
+          UnresolvedAttribute("a") :: Nil,
+          SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
+          UnspecifiedFrame)).as('window)),
+    "Distinct window functions are not supported" :: Nil)
+
+  errorTest(
+    "offset window function",
+    testRelation2.select(
+      WindowExpression(
+        new Lead(UnresolvedAttribute("b")),
+        WindowSpecDefinition(
+          UnresolvedAttribute("a") :: Nil,
+          SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
+          SpecifiedWindowFrame(RangeFrame, ValueFollowing(1), ValueFollowing(2)))).as('window)),
+    "window frame" :: "must match the required frame" :: Nil)
 
   errorTest(
     "too many generators",

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index b1280c3..9852b6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql.execution
 
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.rdd.RDD
-import org.apache.spark.util.collection.CompactBuffer
 
 /**
  * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@@ -42,6 +45,8 @@ import org.apache.spark.util.collection.CompactBuffer
  * - Moving frame: Every time we move to a new row to process, we remove some rows from the frame
  *   and we add some rows to the frame. Examples are:
  *     1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING.
+ * - Offset frame: The frame consist of one row, which is an offset number of rows away from the
+ *   current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame.
  *
  * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
  * boundary can be either Row or Range based:
@@ -122,12 +127,10 @@ case class Window(
           // Create the projection which returns the current 'value'.
           val current = newMutableProjection(expr :: Nil, child.output)()
           // Flip the sign of the offset when processing the order is descending
-          val boundOffset =
-            if (sortExpr.direction == Descending) {
-              -offset
-            } else {
-              offset
-            }
+          val boundOffset = sortExpr.direction match {
+            case Descending => -offset
+            case Ascending => offset
+          }
           // Create the projection which returns the current 'value' modified by adding the offset.
           val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
           val bound = newMutableProjection(boundExpr :: Nil, child.output)()
@@ -149,43 +152,102 @@ case class Window(
   }
 
   /**
-   * Create a frame processor.
-   *
-   * This method uses Code Generation. It can only be used on the executor side.
-   *
-   * @param frame boundaries.
-   * @param functions to process in the frame.
-   * @param ordinal at which the processor starts writing to the output.
-   * @return a frame processor.
+   * Collection containing an entry for each window frame to process. Each entry contains a frames'
+   * WindowExpressions and factory function for the WindowFrameFunction.
    */
-  private[this] def createFrameProcessor(
-      frame: WindowFrame,
-      functions: Array[WindowFunction],
-      ordinal: Int): WindowFunctionFrame = frame match {
-    // Growing Frame.
-    case SpecifiedWindowFrame(frameType, UnboundedPreceding, FrameBoundaryExtractor(high)) =>
-      val uBoundOrdering = createBoundOrdering(frameType, high)
-      new UnboundedPrecedingWindowFunctionFrame(ordinal, functions, uBoundOrdering)
-
-    // Shrinking Frame.
-    case SpecifiedWindowFrame(frameType, FrameBoundaryExtractor(low), UnboundedFollowing) =>
-      val lBoundOrdering = createBoundOrdering(frameType, low)
-      new UnboundedFollowingWindowFunctionFrame(ordinal, functions, lBoundOrdering)
-
-    // Moving Frame.
-    case SpecifiedWindowFrame(frameType,
-        FrameBoundaryExtractor(low), FrameBoundaryExtractor(high)) =>
-      val lBoundOrdering = createBoundOrdering(frameType, low)
-      val uBoundOrdering = createBoundOrdering(frameType, high)
-      new SlidingWindowFunctionFrame(ordinal, functions, lBoundOrdering, uBoundOrdering)
-
-    // Entire Partition Frame.
-    case SpecifiedWindowFrame(_, UnboundedPreceding, UnboundedFollowing) =>
-      new UnboundedWindowFunctionFrame(ordinal, functions)
-
-    // Error
-    case fr =>
-      sys.error(s"Unsupported Frame $fr for functions: $functions")
+  private[this] lazy val windowFrameExpressionFactoryPairs = {
+    type FrameKey = (String, FrameType, Option[Int], Option[Int])
+    type ExpressionBuffer = mutable.Buffer[Expression]
+    val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
+
+    // Add a function and its function to the map for a given frame.
+    def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
+      val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd))
+      val (es, fns) = framedFunctions.getOrElseUpdate(
+        key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
+      es.append(e)
+      fns.append(fn)
+    }
+
+    // Collect all valid window functions and group them by their frame.
+    windowExpression.foreach { x =>
+      x.foreach {
+        case e @ WindowExpression(function, spec) =>
+          val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+          function match {
+            case AggregateExpression(f, _, _) => collect("AGGREGATE", frame, e, f)
+            case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
+            case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
+            case f => sys.error(s"Unsupported window function: $f")
+          }
+        case _ =>
+      }
+    }
+
+    // Map the groups to a (unbound) expression and frame factory pair.
+    var numExpressions = 0
+    framedFunctions.toSeq.map {
+      case (key, (expressions, functionSeq)) =>
+        val ordinal = numExpressions
+        val functions = functionSeq.toArray
+
+        // Construct an aggregate processor if we need one.
+        def processor = AggregateProcessor(functions, ordinal, child.output, newMutableProjection)
+
+        // Create the factory
+        val factory = key match {
+          // Offset Frame
+          case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
+            target: MutableRow =>
+              new OffsetWindowFunctionFrame(
+                target,
+                ordinal,
+                functions,
+                child.output,
+                newMutableProjection,
+                offset)
+
+          // Growing Frame.
+          case ("AGGREGATE", frameType, None, Some(high)) =>
+            target: MutableRow => {
+              new UnboundedPrecedingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, high))
+            }
+
+          // Shrinking Frame.
+          case ("AGGREGATE", frameType, Some(low), None) =>
+            target: MutableRow => {
+              new UnboundedFollowingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, low))
+            }
+
+          // Moving Frame.
+          case ("AGGREGATE", frameType, Some(low), Some(high)) =>
+            target: MutableRow => {
+              new SlidingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, low),
+                createBoundOrdering(frameType, high))
+            }
+
+          // Entire Partition Frame.
+          case ("AGGREGATE", frameType, None, None) =>
+            target: MutableRow => {
+              new UnboundedWindowFunctionFrame(target, processor)
+            }
+        }
+
+        // Keep track of the number of expressions. This is a side-effect in a map...
+        numExpressions += expressions.size
+
+        // Create the Frame Expression - Factory pair.
+        (expressions, factory)
+    }
   }
 
   /**
@@ -210,43 +272,16 @@ case class Window(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    // Prepare processing.
-    // Group the window expression by their processing frame.
-    val windowExprs = windowExpression.flatMap {
-      _.collect {
-        case e: WindowExpression => e
-      }
-    }
-
-    // Create Frame processor factories and order the unbound window expressions by the frame they
-    // are processed in; this is the order in which their results will be written to window
-    // function result buffer.
-    val framedWindowExprs = windowExprs.groupBy(_.windowSpec.frameSpecification)
-    val factories = Array.ofDim[() => WindowFunctionFrame](framedWindowExprs.size)
-    val unboundExpressions = scala.collection.mutable.Buffer.empty[Expression]
-    framedWindowExprs.zipWithIndex.foreach {
-      case ((frame, unboundFrameExpressions), index) =>
-        // Track the ordinal.
-        val ordinal = unboundExpressions.size
-
-        // Track the unbound expressions
-        unboundExpressions ++= unboundFrameExpressions
-
-        // Bind the expressions.
-        val functions = unboundFrameExpressions.map { e =>
-          BindReferences.bindReference(e.windowFunction, child.output)
-        }.toArray
-
-        // Create the frame processor factory.
-        factories(index) = () => createFrameProcessor(frame, functions, ordinal)
-    }
+    // Unwrap the expressions and factories from the map.
+    val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
+    val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
 
     // Start processing.
     child.execute().mapPartitions { stream =>
       new Iterator[InternalRow] {
 
         // Get all relevant projections.
-        val result = createResultProjection(unboundExpressions)
+        val result = createResultProjection(expressions)
         val grouping = UnsafeProjection.create(partitionSpec, child.output)
 
         // Manage the stream and the grouping.
@@ -266,14 +301,15 @@ case class Window(
         fetchNextRow()
 
         // Manage the current partition.
-        var rows: CompactBuffer[InternalRow] = _
-        val frames: Array[WindowFunctionFrame] = factories.map(_())
+        val rows = ArrayBuffer.empty[InternalRow]
+        val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+        val frames = factories.map(_(windowFunctionResult))
         val numFrames = frames.length
         private[this] def fetchNextPartition() {
           // Collect all the rows in the current partition.
           // Before we start to fetch new input rows, make a copy of nextGroup.
           val currentGroup = nextGroup.copy()
-          rows = new CompactBuffer
+          rows.clear()
           while (nextRowAvailable && nextGroup == currentGroup) {
             rows += nextRow.copy()
             fetchNextRow()
@@ -297,7 +333,6 @@ case class Window(
         override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable
 
         val join = new JoinedRow
-        val windowFunctionResult = new GenericMutableRow(unboundExpressions.size)
         override final def next(): InternalRow = {
           // Load the next partition if we need to.
           if (rowIndex >= rowsSize && nextRowAvailable) {
@@ -308,7 +343,7 @@ case class Window(
             // Get the results for the window frames.
             var i = 0
             while (i < numFrames) {
-              frames(i).write(windowFunctionResult)
+              frames(i).write()
               i += 1
             }
 
@@ -355,140 +390,96 @@ private[execution] final case class RangeBoundOrdering(
  * A window function calculates the results of a number of window functions for a window frame.
  * Before use a frame must be prepared by passing it all the rows in the current partition. After
  * preparation the update method can be called to fill the output rows.
- *
- * TODO How to improve performance? A few thoughts:
- * - Window functions are expensive due to its distribution and ordering requirements.
- * Unfortunately it is up to the Spark engine to solve this. Improvements in the form of project
- * Tungsten are on the way.
- * - The window frame processing bit can be improved though. But before we start doing that we
- * need to see how much of the time and resources are spent on partitioning and ordering, and
- * how much time and resources are spent processing the partitions. There are a couple ways to
- * improve on the current situation:
- * - Reduce memory footprint by performing streaming calculations. This can only be done when
- * there are no Unbound/Unbounded Following calculations present.
- * - Use Tungsten style memory usage.
- * - Use code generation in general, and use the approach to aggregation taken in the
- *   GeneratedAggregate class in specific.
- *
- * @param ordinal of the first column written by this frame.
- * @param functions to calculate the row values with.
  */
-private[execution] abstract class WindowFunctionFrame(
-    ordinal: Int,
-    functions: Array[WindowFunction]) {
-
-  // Make sure functions are initialized.
-  functions.foreach(_.init())
-
-  /** Number of columns the window function frame is managing */
-  val numColumns = functions.length
-
-  /**
-   * Create a fresh thread safe copy of the frame.
-   *
-   * @return the copied frame.
-   */
-  def copy: WindowFunctionFrame
-
-  /**
-   * Create new instances of the functions.
-   *
-   * @return an array containing copies of the current window functions.
-   */
-  protected final def copyFunctions: Array[WindowFunction] = functions.map(_.newInstance())
-
+private[execution] abstract class WindowFunctionFrame {
   /**
    * Prepare the frame for calculating the results for a partition.
    *
    * @param rows to calculate the frame results for.
    */
-  def prepare(rows: CompactBuffer[InternalRow]): Unit
+  def prepare(rows: ArrayBuffer[InternalRow]): Unit
 
   /**
-   * Write the result for the current row to the given target row.
-   *
-   * @param target row to write the result for the current row to.
+   * Write the current results to the target row.
    */
-  def write(target: GenericMutableRow): Unit
+  def write(): Unit
+}
 
-  /** Reset the current window functions. */
-  protected final def reset(): Unit = {
-    var i = 0
-    while (i < numColumns) {
-      functions(i).reset()
-      i += 1
-    }
-  }
+/**
+ * The offset window frame calculates frames containing LEAD/LAG statements.
+ *
+ * @param target to write results to.
+ * @param expressions to shift a number of rows.
+ * @param inputSchema required for creating a projection.
+ * @param newMutableProjection function used to create the projection.
+ * @param offset by which rows get moved within a partition.
+ */
+private[execution] final class OffsetWindowFunctionFrame(
+    target: MutableRow,
+    ordinal: Int,
+    expressions: Array[Expression],
+    inputSchema: Seq[Attribute],
+    newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => MutableProjection,
+    offset: Int) extends WindowFunctionFrame {
 
-  /** Prepare an input row for processing. */
-  protected final def prepare(input: InternalRow): Array[AnyRef] = {
-    val prepared = new Array[AnyRef](numColumns)
-    var i = 0
-    while (i < numColumns) {
-      prepared(i) = functions(i).prepareInputParameters(input)
-      i += 1
-    }
-    prepared
-  }
+  /** Rows of the partition currently being processed. */
+  private[this] var input: ArrayBuffer[InternalRow] = null
 
-  /** Evaluate a prepared buffer (iterator). */
-  protected final def evaluatePrepared(iterator: java.util.Iterator[Array[AnyRef]]): Unit = {
-    reset()
-    while (iterator.hasNext) {
-      val prepared = iterator.next()
-      var i = 0
-      while (i < numColumns) {
-        functions(i).update(prepared(i))
-        i += 1
-      }
-    }
-    evaluate()
-  }
+  /** Index of the input row currently used for output. */
+  private[this] var inputIndex = 0
 
-  /** Evaluate a prepared buffer (array). */
-  protected final def evaluatePrepared(prepared: Array[Array[AnyRef]],
-      fromIndex: Int, toIndex: Int): Unit = {
-    var i = 0
-    while (i < numColumns) {
-      val function = functions(i)
-      function.reset()
-      var j = fromIndex
-      while (j < toIndex) {
-        function.update(prepared(j)(i))
-        j += 1
-      }
-      function.evaluate()
-      i += 1
-    }
-  }
+  /** Index of the current output row. */
+  private[this] var outputIndex = 0
 
-  /** Update an array of window functions. */
-  protected final def update(input: InternalRow): Unit = {
-    var i = 0
-    while (i < numColumns) {
-      val aggregate = functions(i)
-      val preparedInput = aggregate.prepareInputParameters(input)
-      aggregate.update(preparedInput)
-      i += 1
+  /** Row used when there is no valid input. */
+  private[this] val emptyRow = new GenericInternalRow(inputSchema.size)
+
+  /** Row used to combine the offset and the current row. */
+  private[this] val join = new JoinedRow
+
+  /** Create the projection. */
+  private[this] val projection = {
+    // Collect the expressions and bind them.
+    val numInputAttributes = inputSchema.size
+    val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map {
+      case e: OffsetWindowFunction =>
+        val input = BindReferences.bindReference(e.input, inputSchema)
+        if (e.default == null || e.default.foldable && e.default.eval() == null) {
+          // Without default value.
+          input
+        } else {
+          // With default value.
+          val default = BindReferences.bindReference(e.default, inputSchema).transform {
+            // Shift the input reference to its default version.
+            case BoundReference(o, dataType, nullable) =>
+              BoundReference(o + numInputAttributes, dataType, nullable)
+          }
+          org.apache.spark.sql.catalyst.expressions.Coalesce(input :: default :: Nil)
+        }
+      case e =>
+        BindReferences.bindReference(e, inputSchema)
     }
+
+    // Create the projection.
+    newMutableProjection(boundExpressions, Nil)().target(target)
   }
 
-  /** Evaluate the window functions. */
-  protected final def evaluate(): Unit = {
-    var i = 0
-    while (i < numColumns) {
-      functions(i).evaluate()
-      i += 1
-    }
+  override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+    input = rows
+    inputIndex = offset
+    outputIndex = 0
   }
 
-  /** Fill a target row with the current window function results. */
-  protected final def fill(target: GenericMutableRow, rowIndex: Int): Unit = {
-    var i = 0
-    while (i < numColumns) {
-      target.update(ordinal + i, functions(i).get(rowIndex))
-      i += 1
+  override def write(): Unit = {
+    val size = input.size
+    if (inputIndex >= 0 && inputIndex < size) {
+      join(input(inputIndex), input(outputIndex))
+    } else {
+      join(emptyRow, input(outputIndex))
     }
+    projection(join)
+    inputIndex += 1
+    outputIndex += 1
   }
 }
 
@@ -496,19 +487,19 @@ private[execution] abstract class WindowFunctionFrame(
  * The sliding window frame calculates frames with the following SQL form:
  * ... BETWEEN 1 PRECEDING AND 1 FOLLOWING
  *
- * @param ordinal of the first column written by this frame.
- * @param functions to calculate the row values with.
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
  * @param lbound comparator used to identify the lower bound of an output row.
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[execution] final class SlidingWindowFunctionFrame(
-    ordinal: Int,
-    functions: Array[WindowFunction],
+    target: MutableRow,
+    processor: AggregateProcessor,
     lbound: BoundOrdering,
-    ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) {
+    ubound: BoundOrdering) extends WindowFunctionFrame {
 
   /** Rows of the partition currently being processed. */
-  private[this] var input: CompactBuffer[InternalRow] = null
+  private[this] var input: ArrayBuffer[InternalRow] = null
 
   /** Index of the first input row with a value greater than the upper bound of the current
     * output row. */
@@ -518,30 +509,25 @@ private[execution] final class SlidingWindowFunctionFrame(
     * current output row. */
   private[this] var inputLowIndex = 0
 
-  /** Buffer used for storing prepared input for the window functions. */
-  private[this] val buffer = new java.util.ArrayDeque[Array[AnyRef]]
-
   /** Index of the row we are currently writing. */
   private[this] var outputIndex = 0
 
   /** Prepare the frame for calculating a new partition. Reset all variables. */
-  override def prepare(rows: CompactBuffer[InternalRow]): Unit = {
+  override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
     input = rows
     inputHighIndex = 0
     inputLowIndex = 0
     outputIndex = 0
-    buffer.clear()
   }
 
   /** Write the frame columns for the current row to the given target row. */
-  override def write(target: GenericMutableRow): Unit = {
+  override def write(): Unit = {
     var bufferUpdated = outputIndex == 0
 
     // Add all rows to the buffer for which the input row value is equal to or less than
     // the output row upper bound.
     while (inputHighIndex < input.size &&
-        ubound.compare(input, inputHighIndex, outputIndex) <= 0) {
-      buffer.offer(prepare(input(inputHighIndex)))
+      ubound.compare(input, inputHighIndex, outputIndex) <= 0) {
       inputHighIndex += 1
       bufferUpdated = true
     }
@@ -549,25 +535,21 @@ private[execution] final class SlidingWindowFunctionFrame(
     // Drop all rows from the buffer for which the input row value is smaller than
     // the output row lower bound.
     while (inputLowIndex < inputHighIndex &&
-        lbound.compare(input, inputLowIndex, outputIndex) < 0) {
-      buffer.pop()
+      lbound.compare(input, inputLowIndex, outputIndex) < 0) {
       inputLowIndex += 1
       bufferUpdated = true
     }
 
     // Only recalculate and update when the buffer changes.
     if (bufferUpdated) {
-      evaluatePrepared(buffer.iterator())
-      fill(target, outputIndex)
+      processor.initialize(input.size)
+      processor.update(input, inputLowIndex, inputHighIndex)
+      processor.evaluate(target)
     }
 
     // Move to the next row.
     outputIndex += 1
   }
-
-  /** Copy the frame. */
-  override def copy: SlidingWindowFunctionFrame =
-    new SlidingWindowFunctionFrame(ordinal, copyFunctions, lbound, ubound)
 }
 
 /**
@@ -578,36 +560,25 @@ private[execution] final class SlidingWindowFunctionFrame(
  * Its results are  the same for each and every row in the partition. This class can be seen as a
  * special case of a sliding window, but is optimized for the unbound case.
  *
- * @param ordinal of the first column written by this frame.
- * @param functions to calculate the row values with.
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
  */
 private[execution] final class UnboundedWindowFunctionFrame(
-    ordinal: Int,
-    functions: Array[WindowFunction]) extends WindowFunctionFrame(ordinal, functions) {
-
-  /** Index of the row we are currently writing. */
-  private[this] var outputIndex = 0
+    target: MutableRow,
+    processor: AggregateProcessor) extends WindowFunctionFrame {
 
   /** Prepare the frame for calculating a new partition. Process all rows eagerly. */
-  override def prepare(rows: CompactBuffer[InternalRow]): Unit = {
-    reset()
-    outputIndex = 0
-    val iterator = rows.iterator
-    while (iterator.hasNext) {
-      update(iterator.next())
-    }
-    evaluate()
+  override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
+    processor.initialize(rows.size)
+    processor.update(rows, 0, rows.size)
   }
 
   /** Write the frame columns for the current row to the given target row. */
-  override def write(target: GenericMutableRow): Unit = {
-    fill(target, outputIndex)
-    outputIndex += 1
+  override def write(): Unit = {
+    // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
+    // for each row.
+    processor.evaluate(target)
   }
-
-  /** Copy the frame. */
-  override def copy: UnboundedWindowFunctionFrame =
-    new UnboundedWindowFunctionFrame(ordinal, copyFunctions)
 }
 
 /**
@@ -620,58 +591,53 @@ private[execution] final class UnboundedWindowFunctionFrame(
  * is not the case when there is no lower bound, given the additive nature of most aggregates
  * streaming updates and partial evaluation suffice and no buffering is needed.
  *
- * @param ordinal of the first column written by this frame.
- * @param functions to calculate the row values with.
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
  * @param ubound comparator used to identify the upper bound of an output row.
  */
 private[execution] final class UnboundedPrecedingWindowFunctionFrame(
-    ordinal: Int,
-    functions: Array[WindowFunction],
-    ubound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) {
+    target: MutableRow,
+    processor: AggregateProcessor,
+    ubound: BoundOrdering) extends WindowFunctionFrame {
 
   /** Rows of the partition currently being processed. */
-  private[this] var input: CompactBuffer[InternalRow] = null
+  private[this] var input: ArrayBuffer[InternalRow] = null
 
   /** Index of the first input row with a value greater than the upper bound of the current
-    * output row. */
+   * output row. */
   private[this] var inputIndex = 0
 
   /** Index of the row we are currently writing. */
   private[this] var outputIndex = 0
 
   /** Prepare the frame for calculating a new partition. */
-  override def prepare(rows: CompactBuffer[InternalRow]): Unit = {
-    reset()
+  override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
     input = rows
     inputIndex = 0
     outputIndex = 0
+    processor.initialize(input.size)
   }
 
   /** Write the frame columns for the current row to the given target row. */
-  override def write(target: GenericMutableRow): Unit = {
+  override def write(): Unit = {
     var bufferUpdated = outputIndex == 0
 
     // Add all rows to the aggregates for which the input row value is equal to or less than
     // the output row upper bound.
     while (inputIndex < input.size && ubound.compare(input, inputIndex, outputIndex) <= 0) {
-      update(input(inputIndex))
+      processor.update(input(inputIndex))
       inputIndex += 1
       bufferUpdated = true
     }
 
     // Only recalculate and update when the buffer changes.
     if (bufferUpdated) {
-      evaluate()
-      fill(target, outputIndex)
+      processor.evaluate(target)
     }
 
     // Move to the next row.
     outputIndex += 1
   }
-
-  /** Copy the frame. */
-  override def copy: UnboundedPrecedingWindowFunctionFrame =
-    new UnboundedPrecedingWindowFunctionFrame(ordinal, copyFunctions, ubound)
 }
 
 /**
@@ -686,45 +652,34 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
  * buffer and must do full recalculation after each row. Reverse iteration would be possible, if
  * the communitativity of the used window functions can be guaranteed.
  *
- * @param ordinal of the first column written by this frame.
- * @param functions to calculate the row values with.
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
  * @param lbound comparator used to identify the lower bound of an output row.
  */
 private[execution] final class UnboundedFollowingWindowFunctionFrame(
-    ordinal: Int,
-    functions: Array[WindowFunction],
-    lbound: BoundOrdering) extends WindowFunctionFrame(ordinal, functions) {
-
-  /** Buffer used for storing prepared input for the window functions. */
-  private[this] var buffer: Array[Array[AnyRef]] = _
+    target: MutableRow,
+    processor: AggregateProcessor,
+    lbound: BoundOrdering) extends WindowFunctionFrame {
 
   /** Rows of the partition currently being processed. */
-  private[this] var input: CompactBuffer[InternalRow] = null
+  private[this] var input: ArrayBuffer[InternalRow] = null
 
   /** Index of the first input row with a value equal to or greater than the lower bound of the
-    * current output row. */
+   * current output row. */
   private[this] var inputIndex = 0
 
   /** Index of the row we are currently writing. */
   private[this] var outputIndex = 0
 
   /** Prepare the frame for calculating a new partition. */
-  override def prepare(rows: CompactBuffer[InternalRow]): Unit = {
+  override def prepare(rows: ArrayBuffer[InternalRow]): Unit = {
     input = rows
     inputIndex = 0
     outputIndex = 0
-    val size = input.size
-    buffer = Array.ofDim(size)
-    var i = 0
-    while (i < size) {
-      buffer(i) = prepare(input(i))
-      i += 1
-    }
-    evaluatePrepared(buffer, 0, buffer.length)
   }
 
   /** Write the frame columns for the current row to the given target row. */
-  override def write(target: GenericMutableRow): Unit = {
+  override def write(): Unit = {
     var bufferUpdated = outputIndex == 0
 
     // Drop all rows from the buffer for which the input row value is smaller than
@@ -736,15 +691,151 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
 
     // Only recalculate and update when the buffer changes.
     if (bufferUpdated) {
-      evaluatePrepared(buffer, inputIndex, buffer.length)
-      fill(target, outputIndex)
+      processor.initialize(input.size)
+      processor.update(input, inputIndex, input.size)
+      processor.evaluate(target)
     }
 
     // Move to the next row.
     outputIndex += 1
   }
+}
+
+/**
+ * This class prepares and manages the processing of a number of [[AggregateFunction]]s within a
+ * single frame. The [[WindowFunctionFrame]] takes care of processing the frame in the correct way,
+ * this reduces the processing of a [[AggregateWindowFunction]] to processing the underlying
+ * [[AggregateFunction]]. All [[AggregateFunction]]s are processed in [[Complete]] mode.
+ *
+ * [[SizeBasedWindowFunction]]s are initialized in a slightly different way. These functions
+ * require the size of the partition processed, this value is exposed to them when the processor is
+ * constructed.
+ *
+ * Processing of distinct aggregates is currently not supported.
+ *
+ * The implementation is split into an object which takes care of construction, and a the actual
+ * processor class.
+ */
+private[execution] object AggregateProcessor {
+  def apply(functions: Array[Expression],
+      ordinal: Int,
+      inputAttributes: Seq[Attribute],
+      newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => MutableProjection):
+      AggregateProcessor = {
+    val aggBufferAttributes = mutable.Buffer.empty[AttributeReference]
+    val initialValues = mutable.Buffer.empty[Expression]
+    val updateExpressions = mutable.Buffer.empty[Expression]
+    val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp)
+    val imperatives = mutable.Buffer.empty[ImperativeAggregate]
+
+    // Check if there are any SizeBasedWindowFunctions. If there are, we add the partition size to
+    // the aggregation buffer. Note that the ordinal of the partition size value will always be 0.
+    val trackPartitionSize = functions.exists(_.isInstanceOf[SizeBasedWindowFunction])
+    if (trackPartitionSize) {
+      aggBufferAttributes += SizeBasedWindowFunction.n
+      initialValues += NoOp
+      updateExpressions += NoOp
+    }
+
+    // Add an AggregateFunction to the AggregateProcessor.
+    functions.foreach {
+      case agg: DeclarativeAggregate =>
+        aggBufferAttributes ++= agg.aggBufferAttributes
+        initialValues ++= agg.initialValues
+        updateExpressions ++= agg.updateExpressions
+        evaluateExpressions += agg.evaluateExpression
+      case agg: ImperativeAggregate =>
+        val offset = aggBufferAttributes.size
+        val imperative = BindReferences.bindReference(agg
+          .withNewInputAggBufferOffset(offset)
+          .withNewMutableAggBufferOffset(offset),
+          inputAttributes)
+        imperatives += imperative
+        aggBufferAttributes ++= imperative.aggBufferAttributes
+        val noOps = Seq.fill(imperative.aggBufferAttributes.size)(NoOp)
+        initialValues ++= noOps
+        updateExpressions ++= noOps
+        evaluateExpressions += imperative
+      case other =>
+        sys.error(s"Unsupported Aggregate Function: $other")
+    }
+
+    // Create the projections.
+    val initialProjection = newMutableProjection(
+      initialValues,
+      Seq(SizeBasedWindowFunction.n))()
+    val updateProjection = newMutableProjection(
+      updateExpressions,
+      aggBufferAttributes ++ inputAttributes)()
+    val evaluateProjection = newMutableProjection(
+      evaluateExpressions,
+      aggBufferAttributes)()
+
+    // Create the processor
+    new AggregateProcessor(
+      aggBufferAttributes.toArray,
+      initialProjection,
+      updateProjection,
+      evaluateProjection,
+      imperatives.toArray,
+      trackPartitionSize)
+  }
+}
+
+/**
+ * This class manages the processing of a number of aggregate functions. See the documentation of
+ * the object for more information.
+ */
+private[execution] final class AggregateProcessor(
+    private[this] val bufferSchema: Array[AttributeReference],
+    private[this] val initialProjection: MutableProjection,
+    private[this] val updateProjection: MutableProjection,
+    private[this] val evaluateProjection: MutableProjection,
+    private[this] val imperatives: Array[ImperativeAggregate],
+    private[this] val trackPartitionSize: Boolean) {
+
+  private[this] val join = new JoinedRow
+  private[this] val numImperatives = imperatives.length
+  private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+  initialProjection.target(buffer)
+  updateProjection.target(buffer)
+
+  /** Create the initial state. */
+  def initialize(size: Int): Unit = {
+    // Some initialization expressions are dependent on the partition size so we have to
+    // initialize the size before initializing all other fields, and we have to pass the buffer to
+    // the initialization projection.
+    if (trackPartitionSize) {
+      buffer.setInt(0, size)
+    }
+    initialProjection(buffer)
+    var i = 0
+    while (i < numImperatives) {
+      imperatives(i).initialize(buffer)
+      i += 1
+    }
+  }
+
+  /** Update the buffer. */
+  def update(input: InternalRow): Unit = {
+    updateProjection(join(buffer, input))
+    var i = 0
+    while (i < numImperatives) {
+      imperatives(i).update(buffer, input)
+      i += 1
+    }
+  }
+
+  /** Bulk update the given buffer. */
+  def update(input: ArrayBuffer[InternalRow], begin: Int, end: Int): Unit = {
+    var i = begin
+    while (i < end) {
+      update(input(i))
+      i += 1
+    }
+  }
 
-  /** Copy the frame. */
-  override def copy: UnboundedFollowingWindowFunctionFrame =
-    new UnboundedFollowingWindowFunctionFrame(ordinal, copyFunctions, lbound)
+  /** Evaluate buffer. */
+  def evaluate(target: MutableRow): Unit =
+    evaluateProjection.target(target)(buffer)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index 893e800..9397fb8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -140,57 +140,7 @@ class WindowSpec private[sql](
    * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
    */
   private[sql] def withAggregate(aggregate: Column): Column = {
-    val windowExpr = aggregate.expr match {
-      // First, we check if we get an aggregate function without the DISTINCT keyword.
-      // Right now, we do not support using a DISTINCT aggregate function as a
-      // window function.
-      case AggregateExpression(aggregateFunction, _, isDistinct) if !isDistinct =>
-        aggregateFunction match {
-          case Average(child) => WindowExpression(
-            UnresolvedWindowFunction("avg", child :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case Sum(child) => WindowExpression(
-            UnresolvedWindowFunction("sum", child :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case Count(children) => WindowExpression(
-            UnresolvedWindowFunction("count", children),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case First(child, ignoreNulls) => WindowExpression(
-            // TODO this is a hack for Hive UDAF first_value
-            UnresolvedWindowFunction(
-              "first_value",
-              child :: ignoreNulls :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case Last(child, ignoreNulls) => WindowExpression(
-            // TODO this is a hack for Hive UDAF last_value
-            UnresolvedWindowFunction(
-              "last_value",
-              child :: ignoreNulls :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case Min(child) => WindowExpression(
-            UnresolvedWindowFunction("min", child :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case Max(child) => WindowExpression(
-            UnresolvedWindowFunction("max", child :: Nil),
-            WindowSpecDefinition(partitionSpec, orderSpec, frame))
-          case x =>
-            throw new UnsupportedOperationException(s"$x is not supported in a window operation.")
-        }
-
-      case AggregateExpression(aggregateFunction, _, isDistinct) if isDistinct =>
-        throw new UnsupportedOperationException(
-          s"Distinct aggregate function ${aggregateFunction} is not supported " +
-            s"in window operation.")
-
-      case wf: WindowFunction =>
-        WindowExpression(
-          wf,
-          WindowSpecDefinition(partitionSpec, orderSpec, frame))
-
-      case x =>
-        throw new UnsupportedOperationException(s"$x is not supported in a window operation.")
-    }
-
-    new Column(windowExpr)
+    val spec = WindowSpecDefinition(partitionSpec, orderSpec, frame)
+    new Column(WindowExpression(aggregate.expr, spec))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index e79defb..65733dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -577,7 +577,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.6.0
    */
-  def cume_dist(): Column = withExpr { UnresolvedWindowFunction("cume_dist", Nil) }
+  def cume_dist(): Column = withExpr { new CumeDist }
 
   /**
    * @group window_funcs
@@ -597,7 +597,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.6.0
    */
-  def dense_rank(): Column = withExpr { UnresolvedWindowFunction("dense_rank", Nil) }
+  def dense_rank(): Column = withExpr { new DenseRank }
 
   /**
    * Window function: returns the value that is `offset` rows before the current row, and
@@ -648,7 +648,7 @@ object functions extends LegacyFunctions {
    * @since 1.4.0
    */
   def lag(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
-    UnresolvedWindowFunction("lag", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil)
+    Lag(e.expr, Literal(offset), Literal(defaultValue))
   }
 
   /**
@@ -700,7 +700,7 @@ object functions extends LegacyFunctions {
    * @since 1.4.0
    */
   def lead(e: Column, offset: Int, defaultValue: Any): Column = withExpr {
-    UnresolvedWindowFunction("lead", e.expr :: Literal(offset) :: Literal(defaultValue) :: Nil)
+    Lead(e.expr, Literal(offset), Literal(defaultValue))
   }
 
   /**
@@ -713,7 +713,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.4.0
    */
-  def ntile(n: Int): Column = withExpr { UnresolvedWindowFunction("ntile", lit(n).expr :: Nil) }
+  def ntile(n: Int): Column = withExpr { new NTile(Literal(n)) }
 
   /**
    * @group window_funcs
@@ -735,7 +735,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.6.0
    */
-  def percent_rank(): Column = withExpr { UnresolvedWindowFunction("percent_rank", Nil) }
+  def percent_rank(): Column = withExpr { new PercentRank }
 
   /**
    * Window function: returns the rank of rows within a window partition.
@@ -750,7 +750,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.4.0
    */
-  def rank(): Column = withExpr { UnresolvedWindowFunction("rank", Nil) }
+  def rank(): Column = withExpr { new Rank }
 
   /**
    * @group window_funcs
@@ -765,7 +765,7 @@ object functions extends LegacyFunctions {
    * @group window_funcs
    * @since 1.6.0
    */
-  def row_number(): Column = withExpr { UnresolvedWindowFunction("row_number", Nil) }
+  def row_number(): Column = withExpr { RowNumber() }
 
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Non-aggregate functions

http://git-wip-us.apache.org/repos/asf/spark/blob/658f66e6/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
new file mode 100644
index 0000000..b50d760
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowSuite.scala
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DataType, LongType, StructType}
+
+class DataFrameWindowSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  test("reuse window partitionBy") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    val w = Window.partitionBy("key").orderBy("value")
+
+    checkAnswer(
+      df.select(
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
+      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
+  }
+
+  test("reuse window orderBy") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    val w = Window.orderBy("value").partitionBy("key")
+
+    checkAnswer(
+      df.select(
+        lead("key", 1).over(w),
+        lead("value", 1).over(w)),
+      Row(1, "1") :: Row(2, "2") :: Row(null, null) :: Row(null, null) :: Nil)
+  }
+
+  test("lead") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+
+    checkAnswer(
+      df.select(
+        lead("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
+      Row("1") :: Row(null) :: Row("2") :: Row(null) :: Nil)
+  }
+
+  test("lag") {
+    val df = Seq((1, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+
+    checkAnswer(
+      df.select(
+        lag("value", 1).over(Window.partitionBy($"key").orderBy($"value"))),
+      Row(null) :: Row("1") :: Row(null) :: Row("2") :: Nil)
+  }
+
+  test("lead with default value") {
+    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
+                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        lead("value", 2, "n/a").over(Window.partitionBy("key").orderBy("value"))),
+      Seq(Row("1"), Row("1"), Row("n/a"), Row("n/a"), Row("2"), Row("n/a"), Row("n/a")))
+  }
+
+  test("lag with default value") {
+    val df = Seq((1, "1"), (1, "1"), (2, "2"), (1, "1"),
+                 (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        lag("value", 2, "n/a").over(Window.partitionBy($"key").orderBy($"value"))),
+      Seq(Row("n/a"), Row("n/a"), Row("1"), Row("1"), Row("n/a"), Row("n/a"), Row("2")))
+  }
+
+  test("rank functions in unspecific window") {
+    val df = Seq((1, "1"), (2, "2"), (1, "2"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        max("key").over(Window.partitionBy("value").orderBy("key")),
+        min("key").over(Window.partitionBy("value").orderBy("key")),
+        mean("key").over(Window.partitionBy("value").orderBy("key")),
+        count("key").over(Window.partitionBy("value").orderBy("key")),
+        sum("key").over(Window.partitionBy("value").orderBy("key")),
+        ntile(2).over(Window.partitionBy("value").orderBy("key")),
+        row_number().over(Window.partitionBy("value").orderBy("key")),
+        dense_rank().over(Window.partitionBy("value").orderBy("key")),
+        rank().over(Window.partitionBy("value").orderBy("key")),
+        cume_dist().over(Window.partitionBy("value").orderBy("key")),
+        percent_rank().over(Window.partitionBy("value").orderBy("key"))),
+      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d, 0.0d) ::
+      Row(1, 1, 1, 1.0d, 1, 1, 1, 1, 1, 1, 1.0d / 3.0d, 0.0d) ::
+      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 1, 2, 2, 2, 1.0d, 0.5d) ::
+      Row(2, 2, 1, 5.0d / 3.0d, 3, 5, 2, 3, 2, 2, 1.0d, 0.5d) :: Nil)
+  }
+
+  test("aggregation and rows between") {
+    val df = Seq((1, "1"), (2, "1"), (2, "2"), (1, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        avg("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 2))),
+      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(3.0d / 2.0d), Row(2.0d), Row(2.0d)))
+  }
+
+  test("aggregation and range between") {
+    val df = Seq((1, "1"), (1, "1"), (3, "1"), (2, "2"), (2, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        avg("key").over(Window.partitionBy($"value").orderBy($"key").rangeBetween(-1, 1))),
+      Seq(Row(4.0d / 3.0d), Row(4.0d / 3.0d), Row(7.0d / 4.0d), Row(5.0d / 2.0d),
+        Row(2.0d), Row(2.0d)))
+  }
+
+  test("aggregation and rows between with unbounded") {
+    val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        last("key").over(
+          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, Long.MaxValue)),
+        last("key").over(
+          Window.partitionBy($"value").orderBy($"key").rowsBetween(Long.MinValue, 0)),
+        last("key").over(Window.partitionBy($"value").orderBy($"key").rowsBetween(-1, 1))),
+      Seq(Row(1, 1, 1, 1), Row(2, 3, 2, 3), Row(3, 3, 3, 3), Row(1, 4, 1, 2), Row(2, 4, 2, 4),
+        Row(4, 4, 4, 4)))
+  }
+
+  test("aggregation and range between with unbounded") {
+    val df = Seq((5, "1"), (5, "2"), (4, "2"), (6, "2"), (3, "1"), (2, "2")).toDF("key", "value")
+    df.registerTempTable("window_table")
+    checkAnswer(
+      df.select(
+        $"key",
+        last("value").over(
+          Window.partitionBy($"value").orderBy($"key").rangeBetween(-2, -1))
+          .equalTo("2")
+          .as("last_v"),
+        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(Long.MinValue, 1))
+          .as("avg_key1"),
+        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(0, Long.MaxValue))
+          .as("avg_key2"),
+        avg("key").over(Window.partitionBy("value").orderBy("key").rangeBetween(-1, 0))
+          .as("avg_key3")
+      ),
+      Seq(Row(3, null, 3.0d, 4.0d, 3.0d),
+        Row(5, false, 4.0d, 5.0d, 5.0d),
+        Row(2, null, 2.0d, 17.0d / 4.0d, 2.0d),
+        Row(4, true, 11.0d / 3.0d, 5.0d, 4.0d),
+        Row(5, true, 17.0d / 4.0d, 11.0d / 2.0d, 4.5d),
+        Row(6, true, 17.0d / 4.0d, 6.0d, 11.0d / 2.0d)))
+  }
+
+  test("reverse sliding range frame") {
+    val df = Seq(
+      (1, "Thin", "Cell Phone", 6000),
+      (2, "Normal", "Tablet", 1500),
+      (3, "Mini", "Tablet", 5500),
+      (4, "Ultra thin", "Cell Phone", 5500),
+      (5, "Very thin", "Cell Phone", 6000),
+      (6, "Big", "Tablet", 2500),
+      (7, "Bendable", "Cell Phone", 3000),
+      (8, "Foldable", "Cell Phone", 3000),
+      (9, "Pro", "Tablet", 4500),
+      (10, "Pro2", "Tablet", 6500)).
+      toDF("id", "product", "category", "revenue")
+    val window = Window.
+      partitionBy($"category").
+      orderBy($"revenue".desc).
+      rangeBetween(-2000L, 1000L)
+    checkAnswer(
+      df.select(
+        $"id",
+        avg($"revenue").over(window).cast("int")),
+      Row(1, 5833) :: Row(2, 2000) :: Row(3, 5500) ::
+        Row(4, 5833) :: Row(5, 5833) :: Row(6, 2833) ::
+        Row(7, 3000) :: Row(8, 3000) :: Row(9, 5500) ::
+        Row(10, 6000) :: Nil)
+  }
+
+  // This is here to illustrate the fact that reverse order also reverses offsets.
+  test("reverse unbounded range frame") {
+    val df = Seq(1, 2, 4, 3, 2, 1).
+      map(Tuple1.apply).
+      toDF("value")
+    val window = Window.orderBy($"value".desc)
+    checkAnswer(
+      df.select(
+        $"value",
+        sum($"value").over(window.rangeBetween(Long.MinValue, 1)),
+        sum($"value").over(window.rangeBetween(1, Long.MaxValue))),
+      Row(1, 13, null) :: Row(2, 13, 2) :: Row(4, 7, 9) ::
+        Row(3, 11, 6) :: Row(2, 13, 2) :: Row(1, 13, null) :: Nil)
+  }
+
+  test("statistical functions") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2)).
+      toDF("key", "value")
+    val window = Window.partitionBy($"key")
+    checkAnswer(
+      df.select(
+        $"key",
+        var_pop($"value").over(window),
+        var_samp($"value").over(window),
+        approxCountDistinct($"value").over(window)),
+      Seq.fill(4)(Row("a", 1.0d / 4.0d, 1.0d / 3.0d, 2))
+      ++ Seq.fill(3)(Row("b", 2.0d / 3.0d, 1.0d, 3)))
+  }
+
+  test("window function with aggregates") {
+    val df = Seq(("a", 1), ("a", 1), ("a", 2), ("a", 2), ("b", 4), ("b", 3), ("b", 2)).
+      toDF("key", "value")
+    val window = Window.orderBy()
+    checkAnswer(
+      df.groupBy($"key")
+        .agg(
+          sum($"value"),
+          sum(sum($"value")).over(window) - sum($"value")),
+      Seq(Row("a", 6, 9), Row("b", 9, 6)))
+  }
+
+  test("window function with udaf") {
+    val udaf = new UserDefinedAggregateFunction {
+      def inputSchema: StructType = new StructType()
+        .add("a", LongType)
+        .add("b", LongType)
+
+      def bufferSchema: StructType = new StructType()
+        .add("product", LongType)
+
+      def dataType: DataType = LongType
+
+      def deterministic: Boolean = true
+
+      def initialize(buffer: MutableAggregationBuffer): Unit = {
+        buffer(0) = 0L
+      }
+
+      def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+        if (!(input.isNullAt(0) || input.isNullAt(1))) {
+          buffer(0) = buffer.getLong(0) + input.getLong(0) * input.getLong(1)
+        }
+      }
+
+      def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
+      }
+
+      def evaluate(buffer: Row): Any =
+        buffer.getLong(0)
+    }
+    val df = Seq(
+      ("a", 1, 1),
+      ("a", 1, 5),
+      ("a", 2, 10),
+      ("a", 2, -1),
+      ("b", 4, 7),
+      ("b", 3, 8),
+      ("b", 2, 4))
+      .toDF("key", "a", "b")
+    val window = Window.partitionBy($"key").orderBy($"a").rangeBetween(Long.MinValue, 0L)
+    checkAnswer(
+      df.select(
+        $"key",
+        $"a",
+        $"b",
+        udaf($"a", $"b").over(window)),
+      Seq(
+        Row("a", 1, 1, 6),
+        Row("a", 1, 5, 6),
+        Row("a", 2, 10, 24),
+        Row("a", 2, -1, 24),
+        Row("b", 4, 7, 60),
+        Row("b", 3, 8, 32),
+        Row("b", 2, 4, 8)))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org