You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/09/27 19:37:24 UTC

[1/2] spark git commit: [SPARK-17677][SQL] Break WindowExec.scala into multiple files

Repository: spark
Updated Branches:
  refs/heads/master 2ab24a7bf -> 67c73052b


http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
new file mode 100644
index 0000000..2ab9faa
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A window function calculates the results of a number of window functions for a window frame.
+ * Before use a frame must be prepared by passing it all the rows in the current partition. After
+ * preparation the update method can be called to fill the output rows.
+ */
+private[window] abstract class WindowFunctionFrame {
+  /**
+   * Prepare the frame for calculating the results for a partition.
+   *
+   * @param rows to calculate the frame results for.
+   */
+  def prepare(rows: RowBuffer): Unit
+
+  /**
+   * Write the current results to the target row.
+   */
+  def write(index: Int, current: InternalRow): Unit
+}
+
+/**
+ * The offset window frame calculates frames containing LEAD/LAG statements.
+ *
+ * @param target to write results to.
+ * @param ordinal the ordinal is the starting offset at which the results of the window frame get
+ *                written into the (shared) target row. The result of the frame expression with
+ *                index 'i' will be written to the 'ordinal' + 'i' position in the target row.
+ * @param expressions to shift a number of rows.
+ * @param inputSchema required for creating a projection.
+ * @param newMutableProjection function used to create the projection.
+ * @param offset by which rows get moved within a partition.
+ */
+private[window] final class OffsetWindowFunctionFrame(
+    target: MutableRow,
+    ordinal: Int,
+    expressions: Array[OffsetWindowFunction],
+    inputSchema: Seq[Attribute],
+    newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
+    offset: Int)
+  extends WindowFunctionFrame {
+
+  /** Rows of the partition currently being processed. */
+  private[this] var input: RowBuffer = null
+
+  /** Index of the input row currently used for output. */
+  private[this] var inputIndex = 0
+
+  /**
+   * Create the projection used when the offset row exists.
+   * Please note that this project always respect null input values (like PostgreSQL).
+   */
+  private[this] val projection = {
+    // Collect the expressions and bind them.
+    val inputAttrs = inputSchema.map(_.withNullability(true))
+    val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
+      BindReferences.bindReference(e.input, inputAttrs)
+    }
+
+    // Create the projection.
+    newMutableProjection(boundExpressions, Nil).target(target)
+  }
+
+  /** Create the projection used when the offset row DOES NOT exists. */
+  private[this] val fillDefaultValue = {
+    // Collect the expressions and bind them.
+    val inputAttrs = inputSchema.map(_.withNullability(true))
+    val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
+      if (e.default == null || e.default.foldable && e.default.eval() == null) {
+        // The default value is null.
+        Literal.create(null, e.dataType)
+      } else {
+        // The default value is an expression.
+        BindReferences.bindReference(e.default, inputAttrs)
+      }
+    }
+
+    // Create the projection.
+    newMutableProjection(boundExpressions, Nil).target(target)
+  }
+
+  override def prepare(rows: RowBuffer): Unit = {
+    input = rows
+    // drain the first few rows if offset is larger than zero
+    inputIndex = 0
+    while (inputIndex < offset) {
+      input.next()
+      inputIndex += 1
+    }
+    inputIndex = offset
+  }
+
+  override def write(index: Int, current: InternalRow): Unit = {
+    if (inputIndex >= 0 && inputIndex < input.size) {
+      val r = input.next()
+      projection(r)
+    } else {
+      // Use default values since the offset row does not exist.
+      fillDefaultValue(current)
+    }
+    inputIndex += 1
+  }
+}
+
+/**
+ * The sliding window frame calculates frames with the following SQL form:
+ * ... BETWEEN 1 PRECEDING AND 1 FOLLOWING
+ *
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
+ * @param lbound comparator used to identify the lower bound of an output row.
+ * @param ubound comparator used to identify the upper bound of an output row.
+ */
+private[window] final class SlidingWindowFunctionFrame(
+    target: MutableRow,
+    processor: AggregateProcessor,
+    lbound: BoundOrdering,
+    ubound: BoundOrdering)
+  extends WindowFunctionFrame {
+
+  /** Rows of the partition currently being processed. */
+  private[this] var input: RowBuffer = null
+
+  /** The next row from `input`. */
+  private[this] var nextRow: InternalRow = null
+
+  /** The rows within current sliding window. */
+  private[this] val buffer = new util.ArrayDeque[InternalRow]()
+
+  /**
+   * Index of the first input row with a value greater than the upper bound of the current
+   * output row.
+   */
+  private[this] var inputHighIndex = 0
+
+  /**
+   * Index of the first input row with a value equal to or greater than the lower bound of the
+   * current output row.
+   */
+  private[this] var inputLowIndex = 0
+
+  /** Prepare the frame for calculating a new partition. Reset all variables. */
+  override def prepare(rows: RowBuffer): Unit = {
+    input = rows
+    nextRow = rows.next()
+    inputHighIndex = 0
+    inputLowIndex = 0
+    buffer.clear()
+  }
+
+  /** Write the frame columns for the current row to the given target row. */
+  override def write(index: Int, current: InternalRow): Unit = {
+    var bufferUpdated = index == 0
+
+    // Add all rows to the buffer for which the input row value is equal to or less than
+    // the output row upper bound.
+    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
+      buffer.add(nextRow.copy())
+      nextRow = input.next()
+      inputHighIndex += 1
+      bufferUpdated = true
+    }
+
+    // Drop all rows from the buffer for which the input row value is smaller than
+    // the output row lower bound.
+    while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
+      buffer.remove()
+      inputLowIndex += 1
+      bufferUpdated = true
+    }
+
+    // Only recalculate and update when the buffer changes.
+    if (bufferUpdated) {
+      processor.initialize(input.size)
+      val iter = buffer.iterator()
+      while (iter.hasNext) {
+        processor.update(iter.next())
+      }
+      processor.evaluate(target)
+    }
+  }
+}
+
+/**
+ * The unbounded window frame calculates frames with the following SQL forms:
+ * ... (No Frame Definition)
+ * ... BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
+ *
+ * Its results are the same for each and every row in the partition. This class can be seen as a
+ * special case of a sliding window, but is optimized for the unbound case.
+ *
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
+ */
+private[window] final class UnboundedWindowFunctionFrame(
+    target: MutableRow,
+    processor: AggregateProcessor)
+  extends WindowFunctionFrame {
+
+  /** Prepare the frame for calculating a new partition. Process all rows eagerly. */
+  override def prepare(rows: RowBuffer): Unit = {
+    val size = rows.size
+    processor.initialize(size)
+    var i = 0
+    while (i < size) {
+      processor.update(rows.next())
+      i += 1
+    }
+  }
+
+  /** Write the frame columns for the current row to the given target row. */
+  override def write(index: Int, current: InternalRow): Unit = {
+    // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
+    // for each row.
+    processor.evaluate(target)
+  }
+}
+
+/**
+ * The UnboundPreceding window frame calculates frames with the following SQL form:
+ * ... BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ *
+ * There is only an upper bound. Very common use cases are for instance running sums or counts
+ * (row_number). Technically this is a special case of a sliding window. However a sliding window
+ * has to maintain a buffer, and it must do a full evaluation everytime the buffer changes. This
+ * is not the case when there is no lower bound, given the additive nature of most aggregates
+ * streaming updates and partial evaluation suffice and no buffering is needed.
+ *
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
+ * @param ubound comparator used to identify the upper bound of an output row.
+ */
+private[window] final class UnboundedPrecedingWindowFunctionFrame(
+    target: MutableRow,
+    processor: AggregateProcessor,
+    ubound: BoundOrdering)
+  extends WindowFunctionFrame {
+
+  /** Rows of the partition currently being processed. */
+  private[this] var input: RowBuffer = null
+
+  /** The next row from `input`. */
+  private[this] var nextRow: InternalRow = null
+
+  /**
+   * Index of the first input row with a value greater than the upper bound of the current
+   * output row.
+   */
+  private[this] var inputIndex = 0
+
+  /** Prepare the frame for calculating a new partition. */
+  override def prepare(rows: RowBuffer): Unit = {
+    input = rows
+    nextRow = rows.next()
+    inputIndex = 0
+    processor.initialize(input.size)
+  }
+
+  /** Write the frame columns for the current row to the given target row. */
+  override def write(index: Int, current: InternalRow): Unit = {
+    var bufferUpdated = index == 0
+
+    // Add all rows to the aggregates for which the input row value is equal to or less than
+    // the output row upper bound.
+    while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) {
+      processor.update(nextRow)
+      nextRow = input.next()
+      inputIndex += 1
+      bufferUpdated = true
+    }
+
+    // Only recalculate and update when the buffer changes.
+    if (bufferUpdated) {
+      processor.evaluate(target)
+    }
+  }
+}
+
+/**
+ * The UnboundFollowing window frame calculates frames with the following SQL form:
+ * ... BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
+ *
+ * There is only an upper bound. This is a slightly modified version of the sliding window. The
+ * sliding window operator has to check if both upper and the lower bound change when a new row
+ * gets processed, where as the unbounded following only has to check the lower bound.
+ *
+ * This is a very expensive operator to use, O(n * (n - 1) /2), because we need to maintain a
+ * buffer and must do full recalculation after each row. Reverse iteration would be possible, if
+ * the commutativity of the used window functions can be guaranteed.
+ *
+ * @param target to write results to.
+ * @param processor to calculate the row values with.
+ * @param lbound comparator used to identify the lower bound of an output row.
+ */
+private[window] final class UnboundedFollowingWindowFunctionFrame(
+    target: MutableRow,
+    processor: AggregateProcessor,
+    lbound: BoundOrdering)
+  extends WindowFunctionFrame {
+
+  /** Rows of the partition currently being processed. */
+  private[this] var input: RowBuffer = null
+
+  /**
+   * Index of the first input row with a value equal to or greater than the lower bound of the
+   * current output row.
+   */
+  private[this] var inputIndex = 0
+
+  /** Prepare the frame for calculating a new partition. */
+  override def prepare(rows: RowBuffer): Unit = {
+    input = rows
+    inputIndex = 0
+  }
+
+  /** Write the frame columns for the current row to the given target row. */
+  override def write(index: Int, current: InternalRow): Unit = {
+    var bufferUpdated = index == 0
+
+    // Duplicate the input to have a new iterator
+    val tmp = input.copy()
+
+    // Drop all rows from the buffer for which the input row value is smaller than
+    // the output row lower bound.
+    tmp.skip(inputIndex)
+    var nextRow = tmp.next()
+    while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) {
+      nextRow = tmp.next()
+      inputIndex += 1
+      bufferUpdated = true
+    }
+
+    // Only recalculate and update when the buffer changes.
+    if (bufferUpdated) {
+      processor.initialize(input.size)
+      while (nextRow != null) {
+        processor.update(nextRow)
+        nextRow = tmp.next()
+      }
+      processor.evaluate(target)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-17677][SQL] Break WindowExec.scala into multiple files

Posted by hv...@apache.org.
[SPARK-17677][SQL] Break WindowExec.scala into multiple files

## What changes were proposed in this pull request?
As of Spark 2.0, all the window function execution code are in WindowExec.scala. This file is pretty large (over 1k loc) and has a lot of different abstractions in them. This patch creates a new package sql.execution.window, moves WindowExec.scala in it, and breaks WindowExec.scala into multiple, more maintainable pieces:

- AggregateProcessor.scala
- BoundOrdering.scala
- RowBuffer.scala
- WindowExec
- WindowFunctionFrame.scala

## How was this patch tested?
This patch mostly moves code around, and should not change any existing test coverage.

Author: Reynold Xin <rx...@databricks.com>

Closes #15252 from rxin/SPARK-17677.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67c73052
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67c73052
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67c73052

Branch: refs/heads/master
Commit: 67c73052b877a8709ae6fa22b844a45f114b1f7e
Parents: 2ab24a7
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Sep 27 12:37:19 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Tue Sep 27 12:37:19 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   |    3 +-
 .../apache/spark/sql/execution/WindowExec.scala | 1013 ------------------
 .../execution/window/AggregateProcessor.scala   |  159 +++
 .../sql/execution/window/BoundOrdering.scala    |   58 +
 .../spark/sql/execution/window/RowBuffer.scala  |  115 ++
 .../spark/sql/execution/window/WindowExec.scala |  412 +++++++
 .../execution/window/WindowFunctionFrame.scala  |  367 +++++++
 7 files changed, 1112 insertions(+), 1015 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 3441ccf..7cfae5c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{execution, SaveMode, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
@@ -387,7 +386,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case e @ logical.Expand(_, _, child) =>
         execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
       case logical.Window(windowExprs, partitionSpec, orderSpec, child) =>
-        execution.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
+        execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil
       case logical.Sample(lb, ub, withReplacement, seed, child) =>
         execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
       case logical.LocalRelation(output, data) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
deleted file mode 100644
index 9d006d2..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WindowExec.scala
+++ /dev/null
@@ -1,1013 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.util
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.types.IntegerType
-import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}
-
-/**
- * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
- * partition. The aggregates are calculated for each row in the group. Special processing
- * instructions, frames, are used to calculate these aggregates. Frames are processed in the order
- * specified in the window specification (the ORDER BY ... clause). There are four different frame
- * types:
- * - Entire partition: The frame is the entire partition, i.e.
- *   UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all
- *   rows as inputs and be evaluated once.
- * - Growing frame: We only add new rows into the frame, i.e. UNBOUNDED PRECEDING AND ....
- *   Every time we move to a new row to process, we add some rows to the frame. We do not remove
- *   rows from this frame.
- * - Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING.
- *   Every time we move to a new row to process, we remove some rows from the frame. We do not add
- *   rows to this frame.
- * - Moving frame: Every time we move to a new row to process, we remove some rows from the frame
- *   and we add some rows to the frame. Examples are:
- *     1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING.
- * - Offset frame: The frame consist of one row, which is an offset number of rows away from the
- *   current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame.
- *
- * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
- * boundary can be either Row or Range based:
- * - Row Based: A row based boundary is based on the position of the row within the partition.
- *   An offset indicates the number of rows above or below the current row, the frame for the
- *   current row starts or ends. For instance, given a row based sliding frame with a lower bound
- *   offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
- *   index 4 to index 6.
- * - Range based: A range based boundary is based on the actual value of the ORDER BY
- *   expression(s). An offset is used to alter the value of the ORDER BY expression, for
- *   instance if the current order by expression has a value of 10 and the lower bound offset
- *   is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
- *   number of constraints on the ORDER BY expressions: there can be only one expression and this
- *   expression must have a numerical data type. An exception can be made when the offset is 0,
- *   because no value modification is needed, in this case multiple and non-numeric ORDER BY
- *   expression are allowed.
- *
- * This is quite an expensive operator because every row for a single group must be in the same
- * partition and partitions must be sorted according to the grouping and sort order. The operator
- * requires the planner to take care of the partitioning and sorting.
- *
- * The operator is semi-blocking. The window functions and aggregates are calculated one group at
- * a time, the result will only be made available after the processing for the entire group has
- * finished. The operator is able to process different frame configurations at the same time. This
- * is done by delegating the actual frame processing (i.e. calculation of the window functions) to
- * specialized classes, see [[WindowFunctionFrame]], which take care of their own frame type:
- * Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair
- * of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
- */
-case class WindowExec(
-    windowExpression: Seq[NamedExpression],
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    child: SparkPlan)
-  extends UnaryExecNode {
-
-  override def output: Seq[Attribute] =
-    child.output ++ windowExpression.map(_.toAttribute)
-
-  override def requiredChildDistribution: Seq[Distribution] = {
-    if (partitionSpec.isEmpty) {
-      // Only show warning when the number of bytes is larger than 100 MB?
-      logWarning("No Partition Defined for Window operation! Moving all data to a single "
-        + "partition, this can cause serious performance degradation.")
-      AllTuples :: Nil
-    } else ClusteredDistribution(partitionSpec) :: Nil
-  }
-
-  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
-    Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
-
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-
-  /**
-   * Create a bound ordering object for a given frame type and offset. A bound ordering object is
-   * used to determine which input row lies within the frame boundaries of an output row.
-   *
-   * This method uses Code Generation. It can only be used on the executor side.
-   *
-   * @param frameType to evaluate. This can either be Row or Range based.
-   * @param offset with respect to the row.
-   * @return a bound ordering object.
-   */
-  private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = {
-    frameType match {
-      case RangeFrame =>
-        val (exprs, current, bound) = if (offset == 0) {
-          // Use the entire order expression when the offset is 0.
-          val exprs = orderSpec.map(_.child)
-          val buildProjection = () => newMutableProjection(exprs, child.output)
-          (orderSpec, buildProjection(), buildProjection())
-        } else if (orderSpec.size == 1) {
-          // Use only the first order expression when the offset is non-null.
-          val sortExpr = orderSpec.head
-          val expr = sortExpr.child
-          // Create the projection which returns the current 'value'.
-          val current = newMutableProjection(expr :: Nil, child.output)
-          // Flip the sign of the offset when processing the order is descending
-          val boundOffset = sortExpr.direction match {
-            case Descending => -offset
-            case Ascending => offset
-          }
-          // Create the projection which returns the current 'value' modified by adding the offset.
-          val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
-          val bound = newMutableProjection(boundExpr :: Nil, child.output)
-          (sortExpr :: Nil, current, bound)
-        } else {
-          sys.error("Non-Zero range offsets are not supported for windows " +
-            "with multiple order expressions.")
-        }
-        // Construct the ordering. This is used to compare the result of current value projection
-        // to the result of bound value projection. This is done manually because we want to use
-        // Code Generation (if it is enabled).
-        val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
-          SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
-        }
-        val ordering = newOrdering(sortExprs, Nil)
-        RangeBoundOrdering(ordering, current, bound)
-      case RowFrame => RowBoundOrdering(offset)
-    }
-  }
-
-  /**
-   * Collection containing an entry for each window frame to process. Each entry contains a frames'
-   * WindowExpressions and factory function for the WindowFrameFunction.
-   */
-  private[this] lazy val windowFrameExpressionFactoryPairs = {
-    type FrameKey = (String, FrameType, Option[Int], Option[Int])
-    type ExpressionBuffer = mutable.Buffer[Expression]
-    val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
-
-    // Add a function and its function to the map for a given frame.
-    def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
-      val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd))
-      val (es, fns) = framedFunctions.getOrElseUpdate(
-        key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
-      es += e
-      fns += fn
-    }
-
-    // Collect all valid window functions and group them by their frame.
-    windowExpression.foreach { x =>
-      x.foreach {
-        case e @ WindowExpression(function, spec) =>
-          val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
-          function match {
-            case AggregateExpression(f, _, _, _) => collect("AGGREGATE", frame, e, f)
-            case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
-            case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
-            case f => sys.error(s"Unsupported window function: $f")
-          }
-        case _ =>
-      }
-    }
-
-    // Map the groups to a (unbound) expression and frame factory pair.
-    var numExpressions = 0
-    framedFunctions.toSeq.map {
-      case (key, (expressions, functionSeq)) =>
-        val ordinal = numExpressions
-        val functions = functionSeq.toArray
-
-        // Construct an aggregate processor if we need one.
-        def processor = AggregateProcessor(
-          functions,
-          ordinal,
-          child.output,
-          (expressions, schema) =>
-            newMutableProjection(expressions, schema, subexpressionEliminationEnabled))
-
-        // Create the factory
-        val factory = key match {
-          // Offset Frame
-          case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
-            target: MutableRow =>
-              new OffsetWindowFunctionFrame(
-                target,
-                ordinal,
-                // OFFSET frame functions are guaranteed be OffsetWindowFunctions.
-                functions.map(_.asInstanceOf[OffsetWindowFunction]),
-                child.output,
-                (expressions, schema) =>
-                  newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
-                offset)
-
-          // Growing Frame.
-          case ("AGGREGATE", frameType, None, Some(high)) =>
-            target: MutableRow => {
-              new UnboundedPrecedingWindowFunctionFrame(
-                target,
-                processor,
-                createBoundOrdering(frameType, high))
-            }
-
-          // Shrinking Frame.
-          case ("AGGREGATE", frameType, Some(low), None) =>
-            target: MutableRow => {
-              new UnboundedFollowingWindowFunctionFrame(
-                target,
-                processor,
-                createBoundOrdering(frameType, low))
-            }
-
-          // Moving Frame.
-          case ("AGGREGATE", frameType, Some(low), Some(high)) =>
-            target: MutableRow => {
-              new SlidingWindowFunctionFrame(
-                target,
-                processor,
-                createBoundOrdering(frameType, low),
-                createBoundOrdering(frameType, high))
-            }
-
-          // Entire Partition Frame.
-          case ("AGGREGATE", frameType, None, None) =>
-            target: MutableRow => {
-              new UnboundedWindowFunctionFrame(target, processor)
-            }
-        }
-
-        // Keep track of the number of expressions. This is a side-effect in a map...
-        numExpressions += expressions.size
-
-        // Create the Frame Expression - Factory pair.
-        (expressions, factory)
-    }
-  }
-
-  /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor side.
-   *
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
-   */
-  private[this] def createResultProjection(
-      expressions: Seq[Expression]): UnsafeProjection = {
-    val references = expressions.zipWithIndex.map{ case (e, i) =>
-      // Results of window expressions will be on the right side of child's output
-      BoundReference(child.output.size + i, e.dataType, e.nullable)
-    }
-    val unboundToRefMap = expressions.zip(references).toMap
-    val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
-    UnsafeProjection.create(
-      child.output ++ patchedWindowExpression,
-      child.output)
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    // Unwrap the expressions and factories from the map.
-    val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
-    val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
-
-    // Start processing.
-    child.execute().mapPartitions { stream =>
-      new Iterator[InternalRow] {
-
-        // Get all relevant projections.
-        val result = createResultProjection(expressions)
-        val grouping = UnsafeProjection.create(partitionSpec, child.output)
-
-        // Manage the stream and the grouping.
-        var nextRow: UnsafeRow = null
-        var nextGroup: UnsafeRow = null
-        var nextRowAvailable: Boolean = false
-        private[this] def fetchNextRow() {
-          nextRowAvailable = stream.hasNext
-          if (nextRowAvailable) {
-            nextRow = stream.next().asInstanceOf[UnsafeRow]
-            nextGroup = grouping(nextRow)
-          } else {
-            nextRow = null
-            nextGroup = null
-          }
-        }
-        fetchNextRow()
-
-        // Manage the current partition.
-        val rows = ArrayBuffer.empty[UnsafeRow]
-        val inputFields = child.output.length
-        var sorter: UnsafeExternalSorter = null
-        var rowBuffer: RowBuffer = null
-        val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
-        val frames = factories.map(_(windowFunctionResult))
-        val numFrames = frames.length
-        private[this] def fetchNextPartition() {
-          // Collect all the rows in the current partition.
-          // Before we start to fetch new input rows, make a copy of nextGroup.
-          val currentGroup = nextGroup.copy()
-
-          // clear last partition
-          if (sorter != null) {
-            // the last sorter of this task will be cleaned up via task completion listener
-            sorter.cleanupResources()
-            sorter = null
-          } else {
-            rows.clear()
-          }
-
-          while (nextRowAvailable && nextGroup == currentGroup) {
-            if (sorter == null) {
-              rows += nextRow.copy()
-
-              if (rows.length >= 4096) {
-                // We will not sort the rows, so prefixComparator and recordComparator are null.
-                sorter = UnsafeExternalSorter.create(
-                  TaskContext.get().taskMemoryManager(),
-                  SparkEnv.get.blockManager,
-                  SparkEnv.get.serializerManager,
-                  TaskContext.get(),
-                  null,
-                  null,
-                  1024,
-                  SparkEnv.get.memoryManager.pageSizeBytes,
-                  SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
-                    UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
-                  false)
-                rows.foreach { r =>
-                  sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)
-                }
-                rows.clear()
-              }
-            } else {
-              sorter.insertRecord(nextRow.getBaseObject, nextRow.getBaseOffset,
-                nextRow.getSizeInBytes, 0, false)
-            }
-            fetchNextRow()
-          }
-          if (sorter != null) {
-            rowBuffer = new ExternalRowBuffer(sorter, inputFields)
-          } else {
-            rowBuffer = new ArrayRowBuffer(rows)
-          }
-
-          // Setup the frames.
-          var i = 0
-          while (i < numFrames) {
-            frames(i).prepare(rowBuffer.copy())
-            i += 1
-          }
-
-          // Setup iteration
-          rowIndex = 0
-          rowsSize = rowBuffer.size()
-        }
-
-        // Iteration
-        var rowIndex = 0
-        var rowsSize = 0L
-
-        override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable
-
-        val join = new JoinedRow
-        override final def next(): InternalRow = {
-          // Load the next partition if we need to.
-          if (rowIndex >= rowsSize && nextRowAvailable) {
-            fetchNextPartition()
-          }
-
-          if (rowIndex < rowsSize) {
-            // Get the results for the window frames.
-            var i = 0
-            val current = rowBuffer.next()
-            while (i < numFrames) {
-              frames(i).write(rowIndex, current)
-              i += 1
-            }
-
-            // 'Merge' the input row with the window function result
-            join(current, windowFunctionResult)
-            rowIndex += 1
-
-            // Return the projection.
-            result(join)
-          } else throw new NoSuchElementException
-        }
-      }
-    }
-  }
-}
-
-/**
- * Function for comparing boundary values.
- */
-private[execution] abstract class BoundOrdering {
-  def compare(inputRow: InternalRow, inputIndex: Int, outputRow: InternalRow, outputIndex: Int): Int
-}
-
-/**
- * Compare the input index to the bound of the output index.
- */
-private[execution] final case class RowBoundOrdering(offset: Int) extends BoundOrdering {
-  override def compare(
-      inputRow: InternalRow,
-      inputIndex: Int,
-      outputRow: InternalRow,
-      outputIndex: Int): Int =
-    inputIndex - (outputIndex + offset)
-}
-
-/**
- * Compare the value of the input index to the value bound of the output index.
- */
-private[execution] final case class RangeBoundOrdering(
-    ordering: Ordering[InternalRow],
-    current: Projection,
-    bound: Projection) extends BoundOrdering {
-  override def compare(
-      inputRow: InternalRow,
-      inputIndex: Int,
-      outputRow: InternalRow,
-      outputIndex: Int): Int =
-    ordering.compare(current(inputRow), bound(outputRow))
-}
-
-/**
- * The interface of row buffer for a partition
- */
-private[execution] abstract class RowBuffer {
-
-  /** Number of rows. */
-  def size(): Int
-
-  /** Return next row in the buffer, null if no more left. */
-  def next(): InternalRow
-
-  /** Skip the next `n` rows. */
-  def skip(n: Int): Unit
-
-  /** Return a new RowBuffer that has the same rows. */
-  def copy(): RowBuffer
-}
-
-/**
- * A row buffer based on ArrayBuffer (the number of rows is limited)
- */
-private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer {
-
-  private[this] var cursor: Int = -1
-
-  /** Number of rows. */
-  def size(): Int = buffer.length
-
-  /** Return next row in the buffer, null if no more left. */
-  def next(): InternalRow = {
-    cursor += 1
-    if (cursor < buffer.length) {
-      buffer(cursor)
-    } else {
-      null
-    }
-  }
-
-  /** Skip the next `n` rows. */
-  def skip(n: Int): Unit = {
-    cursor += n
-  }
-
-  /** Return a new RowBuffer that has the same rows. */
-  def copy(): RowBuffer = {
-    new ArrayRowBuffer(buffer)
-  }
-}
-
-/**
- * An external buffer of rows based on UnsafeExternalSorter
- */
-private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)
-  extends RowBuffer {
-
-  private[this] val iter: UnsafeSorterIterator = sorter.getIterator
-
-  private[this] val currentRow = new UnsafeRow(numFields)
-
-  /** Number of rows. */
-  def size(): Int = iter.getNumRecords()
-
-  /** Return next row in the buffer, null if no more left. */
-  def next(): InternalRow = {
-    if (iter.hasNext) {
-      iter.loadNext()
-      currentRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
-      currentRow
-    } else {
-      null
-    }
-  }
-
-  /** Skip the next `n` rows. */
-  def skip(n: Int): Unit = {
-    var i = 0
-    while (i < n && iter.hasNext) {
-      iter.loadNext()
-      i += 1
-    }
-  }
-
-  /** Return a new RowBuffer that has the same rows. */
-  def copy(): RowBuffer = {
-    new ExternalRowBuffer(sorter, numFields)
-  }
-}
-
-/**
- * A window function calculates the results of a number of window functions for a window frame.
- * Before use a frame must be prepared by passing it all the rows in the current partition. After
- * preparation the update method can be called to fill the output rows.
- */
-private[execution] abstract class WindowFunctionFrame {
-  /**
-   * Prepare the frame for calculating the results for a partition.
-   *
-   * @param rows to calculate the frame results for.
-   */
-  def prepare(rows: RowBuffer): Unit
-
-  /**
-   * Write the current results to the target row.
-   */
-  def write(index: Int, current: InternalRow): Unit
-}
-
-/**
- * The offset window frame calculates frames containing LEAD/LAG statements.
- *
- * @param target to write results to.
- * @param ordinal the ordinal is the starting offset at which the results of the window frame get
- *                written into the (shared) target row. The result of the frame expression with
- *                index 'i' will be written to the 'ordinal' + 'i' position in the target row.
- * @param expressions to shift a number of rows.
- * @param inputSchema required for creating a projection.
- * @param newMutableProjection function used to create the projection.
- * @param offset by which rows get moved within a partition.
- */
-private[execution] final class OffsetWindowFunctionFrame(
-    target: MutableRow,
-    ordinal: Int,
-    expressions: Array[OffsetWindowFunction],
-    inputSchema: Seq[Attribute],
-    newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection,
-    offset: Int) extends WindowFunctionFrame {
-
-  /** Rows of the partition currently being processed. */
-  private[this] var input: RowBuffer = null
-
-  /** Index of the input row currently used for output. */
-  private[this] var inputIndex = 0
-
-  /**
-   * Create the projection used when the offset row exists.
-   * Please note that this project always respect null input values (like PostgreSQL).
-   */
-  private[this] val projection = {
-    // Collect the expressions and bind them.
-    val inputAttrs = inputSchema.map(_.withNullability(true))
-    val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
-      BindReferences.bindReference(e.input, inputAttrs)
-    }
-
-    // Create the projection.
-    newMutableProjection(boundExpressions, Nil).target(target)
-  }
-
-  /** Create the projection used when the offset row DOES NOT exists. */
-  private[this] val fillDefaultValue = {
-    // Collect the expressions and bind them.
-    val inputAttrs = inputSchema.map(_.withNullability(true))
-    val boundExpressions = Seq.fill(ordinal)(NoOp) ++ expressions.toSeq.map { e =>
-      if (e.default == null || e.default.foldable && e.default.eval() == null) {
-        // The default value is null.
-        Literal.create(null, e.dataType)
-      } else {
-        // The default value is an expression.
-        BindReferences.bindReference(e.default, inputAttrs)
-      }
-    }
-
-    // Create the projection.
-    newMutableProjection(boundExpressions, Nil).target(target)
-  }
-
-  override def prepare(rows: RowBuffer): Unit = {
-    input = rows
-    // drain the first few rows if offset is larger than zero
-    inputIndex = 0
-    while (inputIndex < offset) {
-      input.next()
-      inputIndex += 1
-    }
-    inputIndex = offset
-  }
-
-  override def write(index: Int, current: InternalRow): Unit = {
-    if (inputIndex >= 0 && inputIndex < input.size) {
-      val r = input.next()
-      projection(r)
-    } else {
-      // Use default values since the offset row does not exist.
-      fillDefaultValue(current)
-    }
-    inputIndex += 1
-  }
-}
-
-/**
- * The sliding window frame calculates frames with the following SQL form:
- * ... BETWEEN 1 PRECEDING AND 1 FOLLOWING
- *
- * @param target to write results to.
- * @param processor to calculate the row values with.
- * @param lbound comparator used to identify the lower bound of an output row.
- * @param ubound comparator used to identify the upper bound of an output row.
- */
-private[execution] final class SlidingWindowFunctionFrame(
-    target: MutableRow,
-    processor: AggregateProcessor,
-    lbound: BoundOrdering,
-    ubound: BoundOrdering) extends WindowFunctionFrame {
-
-  /** Rows of the partition currently being processed. */
-  private[this] var input: RowBuffer = null
-
-  /** The next row from `input`. */
-  private[this] var nextRow: InternalRow = null
-
-  /** The rows within current sliding window. */
-  private[this] val buffer = new util.ArrayDeque[InternalRow]()
-
-  /**
-   * Index of the first input row with a value greater than the upper bound of the current
-   * output row.
-   */
-  private[this] var inputHighIndex = 0
-
-  /**
-   * Index of the first input row with a value equal to or greater than the lower bound of the
-   * current output row.
-   */
-  private[this] var inputLowIndex = 0
-
-  /** Prepare the frame for calculating a new partition. Reset all variables. */
-  override def prepare(rows: RowBuffer): Unit = {
-    input = rows
-    nextRow = rows.next()
-    inputHighIndex = 0
-    inputLowIndex = 0
-    buffer.clear()
-  }
-
-  /** Write the frame columns for the current row to the given target row. */
-  override def write(index: Int, current: InternalRow): Unit = {
-    var bufferUpdated = index == 0
-
-    // Add all rows to the buffer for which the input row value is equal to or less than
-    // the output row upper bound.
-    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
-      buffer.add(nextRow.copy())
-      nextRow = input.next()
-      inputHighIndex += 1
-      bufferUpdated = true
-    }
-
-    // Drop all rows from the buffer for which the input row value is smaller than
-    // the output row lower bound.
-    while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
-      buffer.remove()
-      inputLowIndex += 1
-      bufferUpdated = true
-    }
-
-    // Only recalculate and update when the buffer changes.
-    if (bufferUpdated) {
-      processor.initialize(input.size)
-      val iter = buffer.iterator()
-      while (iter.hasNext) {
-        processor.update(iter.next())
-      }
-      processor.evaluate(target)
-    }
-  }
-}
-
-/**
- * The unbounded window frame calculates frames with the following SQL forms:
- * ... (No Frame Definition)
- * ... BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
- *
- * Its results are  the same for each and every row in the partition. This class can be seen as a
- * special case of a sliding window, but is optimized for the unbound case.
- *
- * @param target to write results to.
- * @param processor to calculate the row values with.
- */
-private[execution] final class UnboundedWindowFunctionFrame(
-    target: MutableRow,
-    processor: AggregateProcessor) extends WindowFunctionFrame {
-
-  /** Prepare the frame for calculating a new partition. Process all rows eagerly. */
-  override def prepare(rows: RowBuffer): Unit = {
-    val size = rows.size()
-    processor.initialize(size)
-    var i = 0
-    while (i < size) {
-      processor.update(rows.next())
-      i += 1
-    }
-  }
-
-  /** Write the frame columns for the current row to the given target row. */
-  override def write(index: Int, current: InternalRow): Unit = {
-    // Unfortunately we cannot assume that evaluation is deterministic. So we need to re-evaluate
-    // for each row.
-    processor.evaluate(target)
-  }
-}
-
-/**
- * The UnboundPreceding window frame calculates frames with the following SQL form:
- * ... BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
- *
- * There is only an upper bound. Very common use cases are for instance running sums or counts
- * (row_number). Technically this is a special case of a sliding window. However a sliding window
- * has to maintain a buffer, and it must do a full evaluation everytime the buffer changes. This
- * is not the case when there is no lower bound, given the additive nature of most aggregates
- * streaming updates and partial evaluation suffice and no buffering is needed.
- *
- * @param target to write results to.
- * @param processor to calculate the row values with.
- * @param ubound comparator used to identify the upper bound of an output row.
- */
-private[execution] final class UnboundedPrecedingWindowFunctionFrame(
-    target: MutableRow,
-    processor: AggregateProcessor,
-    ubound: BoundOrdering) extends WindowFunctionFrame {
-
-  /** Rows of the partition currently being processed. */
-  private[this] var input: RowBuffer = null
-
-  /** The next row from `input`. */
-  private[this] var nextRow: InternalRow = null
-
-  /**
-   * Index of the first input row with a value greater than the upper bound of the current
-   * output row.
-   */
-  private[this] var inputIndex = 0
-
-  /** Prepare the frame for calculating a new partition. */
-  override def prepare(rows: RowBuffer): Unit = {
-    input = rows
-    nextRow = rows.next()
-    inputIndex = 0
-    processor.initialize(input.size)
-  }
-
-  /** Write the frame columns for the current row to the given target row. */
-  override def write(index: Int, current: InternalRow): Unit = {
-    var bufferUpdated = index == 0
-
-    // Add all rows to the aggregates for which the input row value is equal to or less than
-    // the output row upper bound.
-    while (nextRow != null && ubound.compare(nextRow, inputIndex, current, index) <= 0) {
-      processor.update(nextRow)
-      nextRow = input.next()
-      inputIndex += 1
-      bufferUpdated = true
-    }
-
-    // Only recalculate and update when the buffer changes.
-    if (bufferUpdated) {
-      processor.evaluate(target)
-    }
-  }
-}
-
-/**
- * The UnboundFollowing window frame calculates frames with the following SQL form:
- * ... BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
- *
- * There is only an upper bound. This is a slightly modified version of the sliding window. The
- * sliding window operator has to check if both upper and the lower bound change when a new row
- * gets processed, where as the unbounded following only has to check the lower bound.
- *
- * This is a very expensive operator to use, O(n * (n - 1) /2), because we need to maintain a
- * buffer and must do full recalculation after each row. Reverse iteration would be possible, if
- * the commutativity of the used window functions can be guaranteed.
- *
- * @param target to write results to.
- * @param processor to calculate the row values with.
- * @param lbound comparator used to identify the lower bound of an output row.
- */
-private[execution] final class UnboundedFollowingWindowFunctionFrame(
-    target: MutableRow,
-    processor: AggregateProcessor,
-    lbound: BoundOrdering) extends WindowFunctionFrame {
-
-  /** Rows of the partition currently being processed. */
-  private[this] var input: RowBuffer = null
-
-  /**
-   * Index of the first input row with a value equal to or greater than the lower bound of the
-   * current output row.
-   */
-  private[this] var inputIndex = 0
-
-  /** Prepare the frame for calculating a new partition. */
-  override def prepare(rows: RowBuffer): Unit = {
-    input = rows
-    inputIndex = 0
-  }
-
-  /** Write the frame columns for the current row to the given target row. */
-  override def write(index: Int, current: InternalRow): Unit = {
-    var bufferUpdated = index == 0
-
-    // Duplicate the input to have a new iterator
-    val tmp = input.copy()
-
-    // Drop all rows from the buffer for which the input row value is smaller than
-    // the output row lower bound.
-    tmp.skip(inputIndex)
-    var nextRow = tmp.next()
-    while (nextRow != null && lbound.compare(nextRow, inputIndex, current, index) < 0) {
-      nextRow = tmp.next()
-      inputIndex += 1
-      bufferUpdated = true
-    }
-
-    // Only recalculate and update when the buffer changes.
-    if (bufferUpdated) {
-      processor.initialize(input.size)
-      while (nextRow != null) {
-        processor.update(nextRow)
-        nextRow = tmp.next()
-      }
-      processor.evaluate(target)
-    }
-  }
-}
-
-/**
- * This class prepares and manages the processing of a number of [[AggregateFunction]]s within a
- * single frame. The [[WindowFunctionFrame]] takes care of processing the frame in the correct way,
- * this reduces the processing of a [[AggregateWindowFunction]] to processing the underlying
- * [[AggregateFunction]]. All [[AggregateFunction]]s are processed in [[Complete]] mode.
- *
- * [[SizeBasedWindowFunction]]s are initialized in a slightly different way. These functions
- * require the size of the partition processed, this value is exposed to them when the processor is
- * constructed.
- *
- * Processing of distinct aggregates is currently not supported.
- *
- * The implementation is split into an object which takes care of construction, and a the actual
- * processor class.
- */
-private[execution] object AggregateProcessor {
-  def apply(
-      functions: Array[Expression],
-      ordinal: Int,
-      inputAttributes: Seq[Attribute],
-      newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection):
-      AggregateProcessor = {
-    val aggBufferAttributes = mutable.Buffer.empty[AttributeReference]
-    val initialValues = mutable.Buffer.empty[Expression]
-    val updateExpressions = mutable.Buffer.empty[Expression]
-    val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp)
-    val imperatives = mutable.Buffer.empty[ImperativeAggregate]
-
-    // SPARK-14244: `SizeBasedWindowFunction`s are firstly created on driver side and then
-    // serialized to executor side. These functions all reference a global singleton window
-    // partition size attribute reference, i.e., `SizeBasedWindowFunction.n`. Here we must collect
-    // the singleton instance created on driver side instead of using executor side
-    // `SizeBasedWindowFunction.n` to avoid binding failure caused by mismatching expression ID.
-    val partitionSize: Option[AttributeReference] = {
-      val aggs = functions.flatMap(_.collectFirst { case f: SizeBasedWindowFunction => f })
-      aggs.headOption.map(_.n)
-    }
-
-    // Check if there are any SizeBasedWindowFunctions. If there are, we add the partition size to
-    // the aggregation buffer. Note that the ordinal of the partition size value will always be 0.
-    partitionSize.foreach { n =>
-      aggBufferAttributes += n
-      initialValues += NoOp
-      updateExpressions += NoOp
-    }
-
-    // Add an AggregateFunction to the AggregateProcessor.
-    functions.foreach {
-      case agg: DeclarativeAggregate =>
-        aggBufferAttributes ++= agg.aggBufferAttributes
-        initialValues ++= agg.initialValues
-        updateExpressions ++= agg.updateExpressions
-        evaluateExpressions += agg.evaluateExpression
-      case agg: ImperativeAggregate =>
-        val offset = aggBufferAttributes.size
-        val imperative = BindReferences.bindReference(agg
-          .withNewInputAggBufferOffset(offset)
-          .withNewMutableAggBufferOffset(offset),
-          inputAttributes)
-        imperatives += imperative
-        aggBufferAttributes ++= imperative.aggBufferAttributes
-        val noOps = Seq.fill(imperative.aggBufferAttributes.size)(NoOp)
-        initialValues ++= noOps
-        updateExpressions ++= noOps
-        evaluateExpressions += imperative
-      case other =>
-        sys.error(s"Unsupported Aggregate Function: $other")
-    }
-
-    // Create the projections.
-    val initialProjection = newMutableProjection(
-      initialValues,
-      partitionSize.toSeq)
-    val updateProjection = newMutableProjection(
-      updateExpressions,
-      aggBufferAttributes ++ inputAttributes)
-    val evaluateProjection = newMutableProjection(
-      evaluateExpressions,
-      aggBufferAttributes)
-
-    // Create the processor
-    new AggregateProcessor(
-      aggBufferAttributes.toArray,
-      initialProjection,
-      updateProjection,
-      evaluateProjection,
-      imperatives.toArray,
-      partitionSize.isDefined)
-  }
-}
-
-/**
- * This class manages the processing of a number of aggregate functions. See the documentation of
- * the object for more information.
- */
-private[execution] final class AggregateProcessor(
-    private[this] val bufferSchema: Array[AttributeReference],
-    private[this] val initialProjection: MutableProjection,
-    private[this] val updateProjection: MutableProjection,
-    private[this] val evaluateProjection: MutableProjection,
-    private[this] val imperatives: Array[ImperativeAggregate],
-    private[this] val trackPartitionSize: Boolean) {
-
-  private[this] val join = new JoinedRow
-  private[this] val numImperatives = imperatives.length
-  private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
-  initialProjection.target(buffer)
-  updateProjection.target(buffer)
-
-  /** Create the initial state. */
-  def initialize(size: Int): Unit = {
-    // Some initialization expressions are dependent on the partition size so we have to
-    // initialize the size before initializing all other fields, and we have to pass the buffer to
-    // the initialization projection.
-    if (trackPartitionSize) {
-      buffer.setInt(0, size)
-    }
-    initialProjection(buffer)
-    var i = 0
-    while (i < numImperatives) {
-      imperatives(i).initialize(buffer)
-      i += 1
-    }
-  }
-
-  /** Update the buffer. */
-  def update(input: InternalRow): Unit = {
-    updateProjection(join(buffer, input))
-    var i = 0
-    while (i < numImperatives) {
-      imperatives(i).update(buffer, input)
-      i += 1
-    }
-  }
-
-  /** Evaluate buffer. */
-  def evaluate(target: MutableRow): Unit =
-    evaluateProjection.target(target)(buffer)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
new file mode 100644
index 0000000..d3a46d0
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+
+/**
+ * This class prepares and manages the processing of a number of [[AggregateFunction]]s within a
+ * single frame. The [[WindowFunctionFrame]] takes care of processing the frame in the correct way,
+ * this reduces the processing of a [[AggregateWindowFunction]] to processing the underlying
+ * [[AggregateFunction]]. All [[AggregateFunction]]s are processed in [[Complete]] mode.
+ *
+ * [[SizeBasedWindowFunction]]s are initialized in a slightly different way. These functions
+ * require the size of the partition processed, this value is exposed to them when the processor is
+ * constructed.
+ *
+ * Processing of distinct aggregates is currently not supported.
+ *
+ * The implementation is split into an object which takes care of construction, and a the actual
+ * processor class.
+ */
+private[window] object AggregateProcessor {
+  def apply(
+      functions: Array[Expression],
+      ordinal: Int,
+      inputAttributes: Seq[Attribute],
+      newMutableProjection: (Seq[Expression], Seq[Attribute]) => MutableProjection)
+    : AggregateProcessor = {
+    val aggBufferAttributes = mutable.Buffer.empty[AttributeReference]
+    val initialValues = mutable.Buffer.empty[Expression]
+    val updateExpressions = mutable.Buffer.empty[Expression]
+    val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp)
+    val imperatives = mutable.Buffer.empty[ImperativeAggregate]
+
+    // SPARK-14244: `SizeBasedWindowFunction`s are firstly created on driver side and then
+    // serialized to executor side. These functions all reference a global singleton window
+    // partition size attribute reference, i.e., `SizeBasedWindowFunction.n`. Here we must collect
+    // the singleton instance created on driver side instead of using executor side
+    // `SizeBasedWindowFunction.n` to avoid binding failure caused by mismatching expression ID.
+    val partitionSize: Option[AttributeReference] = {
+      val aggs = functions.flatMap(_.collectFirst { case f: SizeBasedWindowFunction => f })
+      aggs.headOption.map(_.n)
+    }
+
+    // Check if there are any SizeBasedWindowFunctions. If there are, we add the partition size to
+    // the aggregation buffer. Note that the ordinal of the partition size value will always be 0.
+    partitionSize.foreach { n =>
+      aggBufferAttributes += n
+      initialValues += NoOp
+      updateExpressions += NoOp
+    }
+
+    // Add an AggregateFunction to the AggregateProcessor.
+    functions.foreach {
+      case agg: DeclarativeAggregate =>
+        aggBufferAttributes ++= agg.aggBufferAttributes
+        initialValues ++= agg.initialValues
+        updateExpressions ++= agg.updateExpressions
+        evaluateExpressions += agg.evaluateExpression
+      case agg: ImperativeAggregate =>
+        val offset = aggBufferAttributes.size
+        val imperative = BindReferences.bindReference(agg
+          .withNewInputAggBufferOffset(offset)
+          .withNewMutableAggBufferOffset(offset),
+          inputAttributes)
+        imperatives += imperative
+        aggBufferAttributes ++= imperative.aggBufferAttributes
+        val noOps = Seq.fill(imperative.aggBufferAttributes.size)(NoOp)
+        initialValues ++= noOps
+        updateExpressions ++= noOps
+        evaluateExpressions += imperative
+      case other =>
+        sys.error(s"Unsupported Aggregate Function: $other")
+    }
+
+    // Create the projections.
+    val initialProj = newMutableProjection(initialValues, partitionSize.toSeq)
+    val updateProj = newMutableProjection(updateExpressions, aggBufferAttributes ++ inputAttributes)
+    val evalProj = newMutableProjection(evaluateExpressions, aggBufferAttributes)
+
+    // Create the processor
+    new AggregateProcessor(
+      aggBufferAttributes.toArray,
+      initialProj,
+      updateProj,
+      evalProj,
+      imperatives.toArray,
+      partitionSize.isDefined)
+  }
+}
+
+/**
+ * This class manages the processing of a number of aggregate functions. See the documentation of
+ * the object for more information.
+ */
+private[window] final class AggregateProcessor(
+    private[this] val bufferSchema: Array[AttributeReference],
+    private[this] val initialProjection: MutableProjection,
+    private[this] val updateProjection: MutableProjection,
+    private[this] val evaluateProjection: MutableProjection,
+    private[this] val imperatives: Array[ImperativeAggregate],
+    private[this] val trackPartitionSize: Boolean) {
+
+  private[this] val join = new JoinedRow
+  private[this] val numImperatives = imperatives.length
+  private[this] val buffer = new SpecificMutableRow(bufferSchema.toSeq.map(_.dataType))
+  initialProjection.target(buffer)
+  updateProjection.target(buffer)
+
+  /** Create the initial state. */
+  def initialize(size: Int): Unit = {
+    // Some initialization expressions are dependent on the partition size so we have to
+    // initialize the size before initializing all other fields, and we have to pass the buffer to
+    // the initialization projection.
+    if (trackPartitionSize) {
+      buffer.setInt(0, size)
+    }
+    initialProjection(buffer)
+    var i = 0
+    while (i < numImperatives) {
+      imperatives(i).initialize(buffer)
+      i += 1
+    }
+  }
+
+  /** Update the buffer. */
+  def update(input: InternalRow): Unit = {
+    updateProjection(join(buffer, input))
+    var i = 0
+    while (i < numImperatives) {
+      imperatives(i).update(buffer, input)
+      i += 1
+    }
+  }
+
+  /** Evaluate buffer. */
+  def evaluate(target: MutableRow): Unit =
+  evaluateProjection.target(target)(buffer)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala
new file mode 100644
index 0000000..d6a8019
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Projection
+
+
+/**
+ * Function for comparing boundary values.
+ */
+private[window] abstract class BoundOrdering {
+  def compare(inputRow: InternalRow, inputIndex: Int, outputRow: InternalRow, outputIndex: Int): Int
+}
+
+/**
+ * Compare the input index to the bound of the output index.
+ */
+private[window] final case class RowBoundOrdering(offset: Int) extends BoundOrdering {
+  override def compare(
+      inputRow: InternalRow,
+      inputIndex: Int,
+      outputRow: InternalRow,
+      outputIndex: Int): Int =
+    inputIndex - (outputIndex + offset)
+}
+
+/**
+ * Compare the value of the input index to the value bound of the output index.
+ */
+private[window] final case class RangeBoundOrdering(
+    ordering: Ordering[InternalRow],
+    current: Projection,
+    bound: Projection)
+  extends BoundOrdering {
+
+  override def compare(
+      inputRow: InternalRow,
+      inputIndex: Int,
+      outputRow: InternalRow,
+      outputIndex: Int): Int =
+    ordering.compare(current(inputRow), bound(outputRow))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala
new file mode 100644
index 0000000..ee36c84
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}
+
+
+/**
+ * The interface of row buffer for a partition. In absence of a buffer pool (with locking), the
+ * row buffer is used to materialize a partition of rows since we need to repeatedly scan these
+ * rows in window function processing.
+ */
+private[window] abstract class RowBuffer {
+
+  /** Number of rows. */
+  def size: Int
+
+  /** Return next row in the buffer, null if no more left. */
+  def next(): InternalRow
+
+  /** Skip the next `n` rows. */
+  def skip(n: Int): Unit
+
+  /** Return a new RowBuffer that has the same rows. */
+  def copy(): RowBuffer
+}
+
+/**
+ * A row buffer based on ArrayBuffer (the number of rows is limited).
+ */
+private[window] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer {
+
+  private[this] var cursor: Int = -1
+
+  /** Number of rows. */
+  override def size: Int = buffer.length
+
+  /** Return next row in the buffer, null if no more left. */
+  override def next(): InternalRow = {
+    cursor += 1
+    if (cursor < buffer.length) {
+      buffer(cursor)
+    } else {
+      null
+    }
+  }
+
+  /** Skip the next `n` rows. */
+  override def skip(n: Int): Unit = {
+    cursor += n
+  }
+
+  /** Return a new RowBuffer that has the same rows. */
+  override def copy(): RowBuffer = {
+    new ArrayRowBuffer(buffer)
+  }
+}
+
+/**
+ * An external buffer of rows based on UnsafeExternalSorter.
+ */
+private[window] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)
+  extends RowBuffer {
+
+  private[this] val iter: UnsafeSorterIterator = sorter.getIterator
+
+  private[this] val currentRow = new UnsafeRow(numFields)
+
+  /** Number of rows. */
+  override def size: Int = iter.getNumRecords()
+
+  /** Return next row in the buffer, null if no more left. */
+  override def next(): InternalRow = {
+    if (iter.hasNext) {
+      iter.loadNext()
+      currentRow.pointTo(iter.getBaseObject, iter.getBaseOffset, iter.getRecordLength)
+      currentRow
+    } else {
+      null
+    }
+  }
+
+  /** Skip the next `n` rows. */
+  override def skip(n: Int): Unit = {
+    var i = 0
+    while (i < n && iter.hasNext) {
+      iter.loadNext()
+      i += 1
+    }
+  }
+
+  /** Return a new RowBuffer that has the same rows. */
+  override def copy(): RowBuffer = {
+    new ExternalRowBuffer(sorter, numFields)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/67c73052/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
new file mode 100644
index 0000000..7a6a30f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
+
+/**
+ * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
+ * partition. The aggregates are calculated for each row in the group. Special processing
+ * instructions, frames, are used to calculate these aggregates. Frames are processed in the order
+ * specified in the window specification (the ORDER BY ... clause). There are four different frame
+ * types:
+ * - Entire partition: The frame is the entire partition, i.e.
+ *   UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING. For this case, window function will take all
+ *   rows as inputs and be evaluated once.
+ * - Growing frame: We only add new rows into the frame, i.e. UNBOUNDED PRECEDING AND ....
+ *   Every time we move to a new row to process, we add some rows to the frame. We do not remove
+ *   rows from this frame.
+ * - Shrinking frame: We only remove rows from the frame, i.e. ... AND UNBOUNDED FOLLOWING.
+ *   Every time we move to a new row to process, we remove some rows from the frame. We do not add
+ *   rows to this frame.
+ * - Moving frame: Every time we move to a new row to process, we remove some rows from the frame
+ *   and we add some rows to the frame. Examples are:
+ *     1 PRECEDING AND CURRENT ROW and 1 FOLLOWING AND 2 FOLLOWING.
+ * - Offset frame: The frame consist of one row, which is an offset number of rows away from the
+ *   current row. Only [[OffsetWindowFunction]]s can be processed in an offset frame.
+ *
+ * Different frame boundaries can be used in Growing, Shrinking and Moving frames. A frame
+ * boundary can be either Row or Range based:
+ * - Row Based: A row based boundary is based on the position of the row within the partition.
+ *   An offset indicates the number of rows above or below the current row, the frame for the
+ *   current row starts or ends. For instance, given a row based sliding frame with a lower bound
+ *   offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
+ *   index 4 to index 6.
+ * - Range based: A range based boundary is based on the actual value of the ORDER BY
+ *   expression(s). An offset is used to alter the value of the ORDER BY expression, for
+ *   instance if the current order by expression has a value of 10 and the lower bound offset
+ *   is -3, the resulting lower bound for the current row will be 10 - 3 = 7. This however puts a
+ *   number of constraints on the ORDER BY expressions: there can be only one expression and this
+ *   expression must have a numerical data type. An exception can be made when the offset is 0,
+ *   because no value modification is needed, in this case multiple and non-numeric ORDER BY
+ *   expression are allowed.
+ *
+ * This is quite an expensive operator because every row for a single group must be in the same
+ * partition and partitions must be sorted according to the grouping and sort order. The operator
+ * requires the planner to take care of the partitioning and sorting.
+ *
+ * The operator is semi-blocking. The window functions and aggregates are calculated one group at
+ * a time, the result will only be made available after the processing for the entire group has
+ * finished. The operator is able to process different frame configurations at the same time. This
+ * is done by delegating the actual frame processing (i.e. calculation of the window functions) to
+ * specialized classes, see [[WindowFunctionFrame]], which take care of their own frame type:
+ * Entire Partition, Sliding, Growing & Shrinking. Boundary evaluation is also delegated to a pair
+ * of specialized classes: [[RowBoundOrdering]] & [[RangeBoundOrdering]].
+ */
+case class WindowExec(
+    windowExpression: Seq[NamedExpression],
+    partitionSpec: Seq[Expression],
+    orderSpec: Seq[SortOrder],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def output: Seq[Attribute] =
+    child.output ++ windowExpression.map(_.toAttribute)
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    if (partitionSpec.isEmpty) {
+      // Only show warning when the number of bytes is larger than 100 MB?
+      logWarning("No Partition Defined for Window operation! Moving all data to a single "
+        + "partition, this can cause serious performance degradation.")
+      AllTuples :: Nil
+    } else ClusteredDistribution(partitionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound ordering object is
+   * used to determine which input row lies within the frame boundaries of an output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor side.
+   *
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
+   * @return a bound ordering object.
+   */
+  private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = {
+    frameType match {
+      case RangeFrame =>
+        val (exprs, current, bound) = if (offset == 0) {
+          // Use the entire order expression when the offset is 0.
+          val exprs = orderSpec.map(_.child)
+          val buildProjection = () => newMutableProjection(exprs, child.output)
+          (orderSpec, buildProjection(), buildProjection())
+        } else if (orderSpec.size == 1) {
+          // Use only the first order expression when the offset is non-null.
+          val sortExpr = orderSpec.head
+          val expr = sortExpr.child
+          // Create the projection which returns the current 'value'.
+          val current = newMutableProjection(expr :: Nil, child.output)
+          // Flip the sign of the offset when processing the order is descending
+          val boundOffset = sortExpr.direction match {
+            case Descending => -offset
+            case Ascending => offset
+          }
+          // Create the projection which returns the current 'value' modified by adding the offset.
+          val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
+          val bound = newMutableProjection(boundExpr :: Nil, child.output)
+          (sortExpr :: Nil, current, bound)
+        } else {
+          sys.error("Non-Zero range offsets are not supported for windows " +
+            "with multiple order expressions.")
+        }
+        // Construct the ordering. This is used to compare the result of current value projection
+        // to the result of bound value projection. This is done manually because we want to use
+        // Code Generation (if it is enabled).
+        val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
+          SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
+        }
+        val ordering = newOrdering(sortExprs, Nil)
+        RangeBoundOrdering(ordering, current, bound)
+      case RowFrame => RowBoundOrdering(offset)
+    }
+  }
+
+  /**
+   * Collection containing an entry for each window frame to process. Each entry contains a frames'
+   * WindowExpressions and factory function for the WindowFrameFunction.
+   */
+  private[this] lazy val windowFrameExpressionFactoryPairs = {
+    type FrameKey = (String, FrameType, Option[Int], Option[Int])
+    type ExpressionBuffer = mutable.Buffer[Expression]
+    val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
+
+    // Add a function and its function to the map for a given frame.
+    def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
+      val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd))
+      val (es, fns) = framedFunctions.getOrElseUpdate(
+        key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
+      es += e
+      fns += fn
+    }
+
+    // Collect all valid window functions and group them by their frame.
+    windowExpression.foreach { x =>
+      x.foreach {
+        case e @ WindowExpression(function, spec) =>
+          val frame = spec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
+          function match {
+            case AggregateExpression(f, _, _, _) => collect("AGGREGATE", frame, e, f)
+            case f: AggregateWindowFunction => collect("AGGREGATE", frame, e, f)
+            case f: OffsetWindowFunction => collect("OFFSET", frame, e, f)
+            case f => sys.error(s"Unsupported window function: $f")
+          }
+        case _ =>
+      }
+    }
+
+    // Map the groups to a (unbound) expression and frame factory pair.
+    var numExpressions = 0
+    framedFunctions.toSeq.map {
+      case (key, (expressions, functionSeq)) =>
+        val ordinal = numExpressions
+        val functions = functionSeq.toArray
+
+        // Construct an aggregate processor if we need one.
+        def processor = AggregateProcessor(
+          functions,
+          ordinal,
+          child.output,
+          (expressions, schema) =>
+            newMutableProjection(expressions, schema, subexpressionEliminationEnabled))
+
+        // Create the factory
+        val factory = key match {
+          // Offset Frame
+          case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
+            target: MutableRow =>
+              new OffsetWindowFunctionFrame(
+                target,
+                ordinal,
+                // OFFSET frame functions are guaranteed be OffsetWindowFunctions.
+                functions.map(_.asInstanceOf[OffsetWindowFunction]),
+                child.output,
+                (expressions, schema) =>
+                  newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
+                offset)
+
+          // Growing Frame.
+          case ("AGGREGATE", frameType, None, Some(high)) =>
+            target: MutableRow => {
+              new UnboundedPrecedingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, high))
+            }
+
+          // Shrinking Frame.
+          case ("AGGREGATE", frameType, Some(low), None) =>
+            target: MutableRow => {
+              new UnboundedFollowingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, low))
+            }
+
+          // Moving Frame.
+          case ("AGGREGATE", frameType, Some(low), Some(high)) =>
+            target: MutableRow => {
+              new SlidingWindowFunctionFrame(
+                target,
+                processor,
+                createBoundOrdering(frameType, low),
+                createBoundOrdering(frameType, high))
+            }
+
+          // Entire Partition Frame.
+          case ("AGGREGATE", frameType, None, None) =>
+            target: MutableRow => {
+              new UnboundedWindowFunctionFrame(target, processor)
+            }
+        }
+
+        // Keep track of the number of expressions. This is a side-effect in a map...
+        numExpressions += expressions.size
+
+        // Create the Frame Expression - Factory pair.
+        (expressions, factory)
+    }
+  }
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = {
+    val references = expressions.zipWithIndex.map{ case (e, i) =>
+      // Results of window expressions will be on the right side of child's output
+      BoundReference(child.output.size + i, e.dataType, e.nullable)
+    }
+    val unboundToRefMap = expressions.zip(references).toMap
+    val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
+    UnsafeProjection.create(
+      child.output ++ patchedWindowExpression,
+      child.output)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    // Unwrap the expressions and factories from the map.
+    val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
+    val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
+
+    // Start processing.
+    child.execute().mapPartitions { stream =>
+      new Iterator[InternalRow] {
+
+        // Get all relevant projections.
+        val result = createResultProjection(expressions)
+        val grouping = UnsafeProjection.create(partitionSpec, child.output)
+
+        // Manage the stream and the grouping.
+        var nextRow: UnsafeRow = null
+        var nextGroup: UnsafeRow = null
+        var nextRowAvailable: Boolean = false
+        private[this] def fetchNextRow() {
+          nextRowAvailable = stream.hasNext
+          if (nextRowAvailable) {
+            nextRow = stream.next().asInstanceOf[UnsafeRow]
+            nextGroup = grouping(nextRow)
+          } else {
+            nextRow = null
+            nextGroup = null
+          }
+        }
+        fetchNextRow()
+
+        // Manage the current partition.
+        val rows = ArrayBuffer.empty[UnsafeRow]
+        val inputFields = child.output.length
+        var sorter: UnsafeExternalSorter = null
+        var rowBuffer: RowBuffer = null
+        val windowFunctionResult = new SpecificMutableRow(expressions.map(_.dataType))
+        val frames = factories.map(_(windowFunctionResult))
+        val numFrames = frames.length
+        private[this] def fetchNextPartition() {
+          // Collect all the rows in the current partition.
+          // Before we start to fetch new input rows, make a copy of nextGroup.
+          val currentGroup = nextGroup.copy()
+
+          // clear last partition
+          if (sorter != null) {
+            // the last sorter of this task will be cleaned up via task completion listener
+            sorter.cleanupResources()
+            sorter = null
+          } else {
+            rows.clear()
+          }
+
+          while (nextRowAvailable && nextGroup == currentGroup) {
+            if (sorter == null) {
+              rows += nextRow.copy()
+
+              if (rows.length >= 4096) {
+                // We will not sort the rows, so prefixComparator and recordComparator are null.
+                sorter = UnsafeExternalSorter.create(
+                  TaskContext.get().taskMemoryManager(),
+                  SparkEnv.get.blockManager,
+                  SparkEnv.get.serializerManager,
+                  TaskContext.get(),
+                  null,
+                  null,
+                  1024,
+                  SparkEnv.get.memoryManager.pageSizeBytes,
+                  SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
+                    UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD),
+                  false)
+                rows.foreach { r =>
+                  sorter.insertRecord(r.getBaseObject, r.getBaseOffset, r.getSizeInBytes, 0, false)
+                }
+                rows.clear()
+              }
+            } else {
+              sorter.insertRecord(nextRow.getBaseObject, nextRow.getBaseOffset,
+                nextRow.getSizeInBytes, 0, false)
+            }
+            fetchNextRow()
+          }
+          if (sorter != null) {
+            rowBuffer = new ExternalRowBuffer(sorter, inputFields)
+          } else {
+            rowBuffer = new ArrayRowBuffer(rows)
+          }
+
+          // Setup the frames.
+          var i = 0
+          while (i < numFrames) {
+            frames(i).prepare(rowBuffer.copy())
+            i += 1
+          }
+
+          // Setup iteration
+          rowIndex = 0
+          rowsSize = rowBuffer.size
+        }
+
+        // Iteration
+        var rowIndex = 0
+        var rowsSize = 0L
+
+        override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable
+
+        val join = new JoinedRow
+        override final def next(): InternalRow = {
+          // Load the next partition if we need to.
+          if (rowIndex >= rowsSize && nextRowAvailable) {
+            fetchNextPartition()
+          }
+
+          if (rowIndex < rowsSize) {
+            // Get the results for the window frames.
+            var i = 0
+            val current = rowBuffer.next()
+            while (i < numFrames) {
+              frames(i).write(rowIndex, current)
+              i += 1
+            }
+
+            // 'Merge' the input row with the window function result
+            join(current, windowFunctionResult)
+            rowIndex += 1
+
+            // Return the projection.
+            result(join)
+          } else throw new NoSuchElementException
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org