You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/30 03:36:04 UTC

spark git commit: Revert "[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary"

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 66fa6bd6d -> e2062b9c1


Revert "[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary"

This reverts commit 66fa6bd6d48b08625ecedfcb5a976678141300bd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2062b9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2062b9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2062b9c

Branch: refs/heads/branch-2.2
Commit: e2062b9c1106433799d2874dfe17e181fe1ecb5e
Parents: 66fa6bd
Author: gatorsmile <ga...@gmail.com>
Authored: Sat Jul 29 20:35:22 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Jul 29 20:35:22 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  15 +-
 .../sql/catalyst/analysis/TypeCoercion.scala    |  23 --
 .../sql/catalyst/expressions/package.scala      |   7 -
 .../expressions/windowExpressions.scala         | 328 +++++++++----------
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  20 +-
 .../spark/sql/catalyst/trees/TreeNode.scala     |   2 +
 .../catalyst/analysis/AnalysisErrorSuite.scala  |   2 +-
 .../catalyst/analysis/TypeCoercionSuite.scala   |  36 --
 .../catalyst/parser/ExpressionParserSuite.scala |  17 +-
 .../sql/catalyst/parser/PlanParserSuite.scala   |   2 +-
 .../sql/catalyst/trees/TreeNodeSuite.scala      |  27 +-
 .../spark/sql/execution/window/WindowExec.scala | 103 +++---
 .../spark/sql/expressions/WindowSpec.scala      |  33 +-
 .../test/resources/sql-tests/inputs/window.sql  |  24 +-
 .../resources/sql-tests/results/window.sql.out  | 146 ++-------
 .../sql/DataFrameWindowFunctionsSuite.scala     |  42 ---
 .../catalyst/ExpressionSQLBuilderSuite.scala    |  10 +-
 17 files changed, 304 insertions(+), 533 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 2e300c5..2e3ac3e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -106,9 +106,11 @@ trait CheckAnalysis extends PredicateHelper {
           case w @ WindowExpression(AggregateExpression(_, _, true, _), _) =>
             failAnalysis(s"Distinct window functions are not supported: $w")
 
-          case w @ WindowExpression(_: OffsetWindowFunction,
-            WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
-             if order.isEmpty || !frame.isOffset =>
+          case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
+               SpecifiedWindowFrame(frame,
+                 FrameBoundary(l),
+                 FrameBoundary(h))))
+             if order.isEmpty || frame != RowFrame || l != h =>
             failAnalysis("An offset window function can only be evaluated in an ordered " +
               s"row-based window frame with a single offset: $w")
 
@@ -117,10 +119,15 @@ trait CheckAnalysis extends PredicateHelper {
             // function.
             e match {
               case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
-                w
               case _ =>
                 failAnalysis(s"Expression '$e' not supported within a window function.")
             }
+            // Make sure the window specification is valid.
+            s.validate match {
+              case Some(m) =>
+                failAnalysis(s"Window specification $s is not valid because $m")
+              case None => w
+            }
 
           case s @ ScalarSubquery(query, conditions, _) =>
             checkAnalysis(query)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index a2fcad9..e1dd010 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -58,7 +58,6 @@ object TypeCoercion {
       PropagateTypes ::
       ImplicitTypeCasts ::
       DateTimeOperations ::
-      WindowFrameCoercion ::
       Nil
 
   // See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
@@ -786,26 +785,4 @@ object TypeCoercion {
       Option(ret)
     }
   }
-
-  /**
-   * Cast WindowFrame boundaries to the type they operate upon.
-   */
-  object WindowFrameCoercion extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
-      case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
-          if order.resolved =>
-        s.copy(frameSpecification = SpecifiedWindowFrame(
-          RangeFrame,
-          createBoundaryCast(lower, order.dataType),
-          createBoundaryCast(upper, order.dataType)))
-    }
-
-    private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
-      boundary match {
-        case e: SpecialFrameBoundary => e
-        case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
-        case _ => boundary
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 1a48995..4c8b177 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -75,13 +75,6 @@ package object expressions  {
   }
 
   /**
-   * An identity projection. This returns the input row.
-   */
-  object IdentityProjection extends Projection {
-    override def apply(row: InternalRow): InternalRow = row
-  }
-
-  /**
    * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
    * column of the new row. If the schema of the input row is specified, then the given expression
    * will be bound to that schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 62c4832..3719042 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.Locale
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, NoOp}
@@ -42,7 +43,34 @@ case class WindowSpecDefinition(
     orderSpec: Seq[SortOrder],
     frameSpecification: WindowFrame) extends Expression with WindowSpec with Unevaluable {
 
-  override def children: Seq[Expression] = partitionSpec ++ orderSpec :+ frameSpecification
+  def validate: Option[String] = frameSpecification match {
+    case UnspecifiedFrame =>
+      Some("Found a UnspecifiedFrame. It should be converted to a SpecifiedWindowFrame " +
+        "during analysis. Please file a bug report.")
+    case frame: SpecifiedWindowFrame => frame.validate.orElse {
+      def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+        if (orderSpec.length > 1)  {
+          // It is not allowed to have a value-based PRECEDING and FOLLOWING
+          // as the boundary of a Range Window Frame.
+          Some("This Range Window Frame only accepts at most one ORDER BY expression.")
+        } else if (orderSpec.nonEmpty && !orderSpec.head.dataType.isInstanceOf[NumericType]) {
+          Some("The data type of the expression in the ORDER BY clause should be a numeric type.")
+        } else {
+          None
+        }
+      }
+
+      (frame.frameType, frame.frameStart, frame.frameEnd) match {
+        case (RangeFrame, vp: ValuePreceding, _) => checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, vf: ValueFollowing, _) => checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, _, vp: ValuePreceding) => checkValueBasedBoundaryForRangeFrame()
+        case (RangeFrame, _, vf: ValueFollowing) => checkValueBasedBoundaryForRangeFrame()
+        case (_, _, _) => None
+      }
+    }
+  }
+
+  override def children: Seq[Expression] = partitionSpec ++ orderSpec
 
   override lazy val resolved: Boolean =
     childrenResolved && checkInputDataTypes().isSuccess &&
@@ -50,46 +78,23 @@ case class WindowSpecDefinition(
 
   override def nullable: Boolean = true
   override def foldable: Boolean = false
-  override def dataType: DataType = throw new UnsupportedOperationException("dataType")
+  override def dataType: DataType = throw new UnsupportedOperationException
 
-  override def checkInputDataTypes(): TypeCheckResult = {
-    frameSpecification match {
-      case UnspecifiedFrame =>
-        TypeCheckFailure(
-          "Cannot use an UnspecifiedFrame. This should have been converted during analysis. " +
-            "Please file a bug report.")
-      case f: SpecifiedWindowFrame if f.frameType == RangeFrame && !f.isUnbounded &&
-          orderSpec.isEmpty =>
-        TypeCheckFailure(
-          "A range window frame cannot be used in an unordered window specification.")
-      case f: SpecifiedWindowFrame if f.frameType == RangeFrame && f.isValueBound &&
-          orderSpec.size > 1 =>
-        TypeCheckFailure(
-          s"A range window frame with value boundaries cannot be used in a window specification " +
-            s"with multiple order by expressions: ${orderSpec.mkString(",")}")
-      case f: SpecifiedWindowFrame if f.frameType == RangeFrame && f.isValueBound &&
-          !isValidFrameType(f.valueBoundary.head.dataType) =>
-        TypeCheckFailure(
-          s"The data type '${orderSpec.head.dataType}' used in the order specification does " +
-            s"not match the data type '${f.valueBoundary.head.dataType}' which is used in the " +
-            "range frame.")
-      case _ => TypeCheckSuccess
+  override def sql: String = {
+    val partition = if (partitionSpec.isEmpty) {
+      ""
+    } else {
+      "PARTITION BY " + partitionSpec.map(_.sql).mkString(", ") + " "
     }
-  }
 
-  override def sql: String = {
-    def toSql(exprs: Seq[Expression], prefix: String): Seq[String] = {
-      Seq(exprs).filter(_.nonEmpty).map(_.map(_.sql).mkString(prefix, ", ", ""))
+    val order = if (orderSpec.isEmpty) {
+      ""
+    } else {
+      "ORDER BY " + orderSpec.map(_.sql).mkString(", ") + " "
     }
 
-    val elements =
-      toSql(partitionSpec, "PARTITION BY ") ++
-        toSql(orderSpec, "ORDER BY ") ++
-        Seq(frameSpecification.sql)
-    elements.mkString("(", " ", ")")
+    s"($partition$order${frameSpecification.toString})"
   }
-
-  private def isValidFrameType(ft: DataType): Boolean = orderSpec.head.dataType == ft
 }
 
 /**
@@ -101,26 +106,22 @@ case class WindowSpecReference(name: String) extends WindowSpec
 /**
  * The trait used to represent the type of a Window Frame.
  */
-sealed trait FrameType {
-  def inputType: AbstractDataType
-  def sql: String
-}
+sealed trait FrameType
 
 /**
- * RowFrame treats rows in a partition individually. Values used in a row frame are considered
- * to be physical offsets.
+ * RowFrame treats rows in a partition individually. When a [[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered
+ * as a physical offset.
  * For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame,
  * from the row precedes the current row to the row follows the current row.
  */
-case object RowFrame extends FrameType {
-  override def inputType: AbstractDataType = IntegerType
-  override def sql: String = "ROWS"
-}
+case object RowFrame extends FrameType
 
 /**
- * RangeFrame treats rows in a partition as groups of peers. All rows having the same `ORDER BY`
- * ordering are considered as peers. Values used in a range frame are considered to be logical
- * offsets.
+ * RangeFrame treats rows in a partition as groups of peers.
+ * All rows having the same `ORDER BY` ordering are considered as peers.
+ * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its [[FrameBoundary]],
+ * the value is considered as a logical offset.
  * For example, assuming the value of the current row's `ORDER BY` expression `expr` is `v`,
  * `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing rows whose values
  * `expr` are in the range of [v-1, v+1].
@@ -128,144 +129,138 @@ case object RowFrame extends FrameType {
  * If `ORDER BY` clause is not defined, all rows in the partition is considered as peers
  * of the current row.
  */
-case object RangeFrame extends FrameType {
-  override def inputType: AbstractDataType = NumericType
-  override def sql: String = "RANGE"
-}
+case object RangeFrame extends FrameType
 
 /**
- * The trait used to represent special boundaries used in a window frame.
+ * The trait used to represent the type of a Window Frame Boundary.
  */
-sealed trait SpecialFrameBoundary extends Expression with Unevaluable {
-  override def children: Seq[Expression] = Nil
-  override def dataType: DataType = NullType
-  override def foldable: Boolean = false
-  override def nullable: Boolean = false
-}
-
-/** UNBOUNDED boundary. */
-case object UnboundedPreceding extends SpecialFrameBoundary {
-  override def sql: String = "UNBOUNDED PRECEDING"
-}
-
-case object UnboundedFollowing extends SpecialFrameBoundary {
-  override def sql: String = "UNBOUNDED FOLLOWING"
-}
-
-/** CURRENT ROW boundary. */
-case object CurrentRow extends SpecialFrameBoundary {
-  override def sql: String = "CURRENT ROW"
+sealed trait FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean
 }
 
 /**
- * The trait used to represent the a Window Frame.
+ * Extractor for making working with frame boundaries easier.
  */
-sealed trait WindowFrame extends Expression with Unevaluable {
-  override def children: Seq[Expression] = Nil
-  override def dataType: DataType = throw new UnsupportedOperationException("dataType")
-  override def foldable: Boolean = false
-  override def nullable: Boolean = false
+object FrameBoundary {
+  def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
+  def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
+    case CurrentRow => Some(0)
+    case ValuePreceding(offset) => Some(-offset)
+    case ValueFollowing(offset) => Some(offset)
+    case _ => None
+  }
 }
 
-/** Used as a place holder when a frame specification is not defined.  */
-case object UnspecifiedFrame extends WindowFrame
-
-/**
- * A specified Window Frame. The val lower/uppper can be either a foldable [[Expression]] or a
- * [[SpecialFrameBoundary]].
- */
-case class SpecifiedWindowFrame(
-    frameType: FrameType,
-    lower: Expression,
-    upper: Expression)
-  extends WindowFrame {
-
-  override def children: Seq[Expression] = lower :: upper :: Nil
+/** UNBOUNDED PRECEDING boundary. */
+case object UnboundedPreceding extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => true
+    case vp: ValuePreceding => true
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
+  }
 
-  lazy val valueBoundary: Seq[Expression] =
-    children.filterNot(_.isInstanceOf[SpecialFrameBoundary])
+  override def toString: String = "UNBOUNDED PRECEDING"
+}
 
-  override def checkInputDataTypes(): TypeCheckResult = {
-    // Check lower value.
-    val lowerCheck = checkBoundary(lower, "lower")
-    if (lowerCheck.isFailure) {
-      return lowerCheck
-    }
+/** <value> PRECEDING boundary. */
+case class ValuePreceding(value: Int) extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case ValuePreceding(anotherValue) => value >= anotherValue
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
+  }
 
-    // Check upper value.
-    val upperCheck = checkBoundary(upper, "upper")
-    if (upperCheck.isFailure) {
-      return upperCheck
-    }
+  override def toString: String = s"$value PRECEDING"
+}
 
-    // Check combination (of expressions).
-    (lower, upper) match {
-      case (l: Expression, u: Expression) if !isValidFrameBoundary(l, u) =>
-        TypeCheckFailure(s"Window frame upper bound '$upper' does not followes the lower bound " +
-          s"'$lower'.")
-      case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
-      case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
-      case (l: Expression, u: Expression) if l.dataType != u.dataType =>
-        TypeCheckFailure(
-          s"Window frame bounds '$lower' and '$upper' do no not have the same data type: " +
-            s"'${l.dataType.catalogString}' <> '${u.dataType.catalogString}'")
-      case (l: Expression, u: Expression) if isGreaterThan(l, u) =>
-        TypeCheckFailure(
-          "The lower bound of a window frame must be less than or equal to the upper bound")
-      case _ => TypeCheckSuccess
-    }
+/** CURRENT ROW boundary. */
+case object CurrentRow extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => true
+    case vf: ValueFollowing => true
+    case UnboundedFollowing => true
   }
 
-  override def sql: String = {
-    val lowerSql = boundarySql(lower)
-    val upperSql = boundarySql(upper)
-    s"${frameType.sql} BETWEEN $lowerSql AND $upperSql"
-  }
+  override def toString: String = "CURRENT ROW"
+}
 
-  def isUnbounded: Boolean = lower == UnboundedPreceding && upper == UnboundedFollowing
+/** <value> FOLLOWING boundary. */
+case class ValueFollowing(value: Int) extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => false
+    case ValueFollowing(anotherValue) => value <= anotherValue
+    case UnboundedFollowing => true
+  }
 
-  def isValueBound: Boolean = valueBoundary.nonEmpty
+  override def toString: String = s"$value FOLLOWING"
+}
 
-  def isOffset: Boolean = (lower, upper) match {
-    case (l: Expression, u: Expression) => frameType == RowFrame && l == u
-    case _ => false
+/** UNBOUNDED FOLLOWING boundary. */
+case object UnboundedFollowing extends FrameBoundary {
+  def notFollows(other: FrameBoundary): Boolean = other match {
+    case UnboundedPreceding => false
+    case vp: ValuePreceding => false
+    case CurrentRow => false
+    case vf: ValueFollowing => false
+    case UnboundedFollowing => true
   }
 
-  private def boundarySql(expr: Expression): String = expr match {
-    case e: SpecialFrameBoundary => e.sql
-    case UnaryMinus(n) => n.sql + " PRECEDING"
-    case e: Expression => e.sql + " FOLLOWING"
-  }
+  override def toString: String = "UNBOUNDED FOLLOWING"
+}
 
-  private def isGreaterThan(l: Expression, r: Expression): Boolean = {
-    GreaterThan(l, r).eval().asInstanceOf[Boolean]
-  }
+/**
+ * The trait used to represent the a Window Frame.
+ */
+sealed trait WindowFrame
+
+/** Used as a place holder when a frame specification is not defined.  */
+case object UnspecifiedFrame extends WindowFrame
 
-  private def checkBoundary(b: Expression, location: String): TypeCheckResult = b match {
-    case _: SpecialFrameBoundary => TypeCheckSuccess
-    case e: Expression if !e.foldable =>
-      TypeCheckFailure(s"Window frame $location bound '$e' is not a literal.")
-    case e: Expression if !frameType.inputType.acceptsType(e.dataType) =>
-      TypeCheckFailure(
-        s"The data type of the $location bound '${e.dataType} does not match " +
-          s"the expected data type '${frameType.inputType}'.")
-    case _ => TypeCheckSuccess
+/** A specified Window Frame. */
+case class SpecifiedWindowFrame(
+    frameType: FrameType,
+    frameStart: FrameBoundary,
+    frameEnd: FrameBoundary) extends WindowFrame {
+
+  /** If this WindowFrame is valid or not. */
+  def validate: Option[String] = (frameType, frameStart, frameEnd) match {
+    case (_, UnboundedFollowing, _) =>
+      Some(s"$UnboundedFollowing is not allowed as the start of a Window Frame.")
+    case (_, _, UnboundedPreceding) =>
+      Some(s"$UnboundedPreceding is not allowed as the end of a Window Frame.")
+    // case (RowFrame, start, end) => ??? RowFrame specific rule
+    // case (RangeFrame, start, end) => ??? RangeFrame specific rule
+    case (_, start, end) =>
+      if (start.notFollows(end)) {
+        None
+      } else {
+        val reason =
+          s"The end of this Window Frame $end is smaller than the start of " +
+          s"this Window Frame $start."
+        Some(reason)
+      }
   }
 
-  private def isValidFrameBoundary(l: Expression, u: Expression): Boolean = {
-    (l, u) match {
-      case (UnboundedFollowing, _) => false
-      case (_, UnboundedPreceding) => false
-      case _ => true
-    }
+  override def toString: String = frameType match {
+    case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
+    case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
   }
 }
 
 object SpecifiedWindowFrame {
   /**
+   *
    * @param hasOrderSpecification If the window spec has order by expressions.
    * @param acceptWindowFrame If the window function accepts user-specified frame.
-   * @return the default window frame.
+   * @return
    */
   def defaultWindowFrame(
       hasOrderSpecification: Boolean,
@@ -356,25 +351,20 @@ abstract class OffsetWindowFunction
 
   override def nullable: Boolean = default == null || default.nullable || input.nullable
 
-  override lazy val frame: WindowFrame = {
+  override lazy val frame = {
+    // This will be triggered by the Analyzer.
+    val offsetValue = offset.eval() match {
+      case o: Int => o
+      case x => throw new AnalysisException(
+        s"Offset expression must be a foldable integer expression: $x")
+    }
     val boundary = direction match {
-      case Ascending => offset
-      case Descending => UnaryMinus(offset)
+      case Ascending => ValueFollowing(offsetValue)
+      case Descending => ValuePreceding(offsetValue)
     }
     SpecifiedWindowFrame(RowFrame, boundary, boundary)
   }
 
-  override def checkInputDataTypes(): TypeCheckResult = {
-    val check = super.checkInputDataTypes()
-    if (check.isFailure) {
-      check
-    } else if (!offset.foldable) {
-      TypeCheckFailure(s"Offset expression '$offset' must be a literal.")
-    } else {
-      TypeCheckSuccess
-    }
-  }
-
   override def dataType: DataType = input.dataType
 
   override def inputTypes: Seq[AbstractDataType] =

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8ea18fa..d1c9332 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1142,26 +1142,32 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
   }
 
   /**
-   * Create or resolve a frame boundary expressions.
+   * Create or resolve a [[FrameBoundary]]. Simple math expressions are allowed for Value
+   * Preceding/Following boundaries. These expressions must be constant (foldable) and return an
+   * integer value.
    */
-  override def visitFrameBound(ctx: FrameBoundContext): Expression = withOrigin(ctx) {
-    def value: Expression = {
+  override def visitFrameBound(ctx: FrameBoundContext): FrameBoundary = withOrigin(ctx) {
+    // We currently only allow foldable integers.
+    def value: Int = {
       val e = expression(ctx.expression)
-      validate(e.resolved && e.foldable, "Frame bound value must be a literal.", ctx)
-      e
+      validate(e.resolved && e.foldable && e.dataType == IntegerType,
+        "Frame bound value must be a constant integer.",
+        ctx)
+      e.eval().asInstanceOf[Int]
     }
 
+    // Create the FrameBoundary
     ctx.boundType.getType match {
       case SqlBaseParser.PRECEDING if ctx.UNBOUNDED != null =>
         UnboundedPreceding
       case SqlBaseParser.PRECEDING =>
-        UnaryMinus(value)
+        ValuePreceding(value)
       case SqlBaseParser.CURRENT =>
         CurrentRow
       case SqlBaseParser.FOLLOWING if ctx.UNBOUNDED != null =>
         UnboundedFollowing
       case SqlBaseParser.FOLLOWING =>
-        value
+        ValueFollowing(value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 6fb5a99..ae5c513 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -688,6 +688,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
     case id: FunctionIdentifier => true
     case spec: BucketSpec => true
     case catalog: CatalogTable => true
+    case boundary: FrameBoundary => true
+    case frame: WindowFrame => true
     case partition: Partitioning => true
     case resource: FunctionResource => true
     case broadcast: BroadcastMode => true

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index adc5032..5050318 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -191,7 +191,7 @@ class AnalysisErrorSuite extends AnalysisTest {
         WindowSpecDefinition(
           UnresolvedAttribute("a") :: Nil,
           SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
-          SpecifiedWindowFrame(RangeFrame, Literal(1), Literal(2)))).as('window)),
+          SpecifiedWindowFrame(RangeFrame, ValueFollowing(1), ValueFollowing(2)))).as('window)),
     "window frame" :: "must match the required frame" :: Nil)
 
   errorTest(

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 345ec0f..2624f558 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -998,42 +998,6 @@ class TypeCoercionSuite extends PlanTest {
       EqualTo(Literal(Array(1, 2)), Literal("123")),
       EqualTo(Literal(Array(1, 2)), Literal("123")))
   }
-
-  test("cast WindowFrame boundaries to the type they operate upon") {
-    // Can cast frame boundaries to order dataType.
-    ruleTest(WindowFrameCoercion,
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal(1L), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, Literal(3), Literal(2147483648L))),
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal(1L), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, Cast(3, LongType), Literal(2147483648L)))
-    )
-    // Cannot cast frame boundaries to order dataType.
-    ruleTest(WindowFrameCoercion,
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal.default(DateType), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, Literal(10.0), Literal(2147483648L))),
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal.default(DateType), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, Literal(10.0), Literal(2147483648L)))
-    )
-    // Should not cast SpecialFrameBoundary.
-    ruleTest(WindowFrameCoercion,
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal(1L), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, CurrentRow, UnboundedFollowing)),
-      windowSpec(
-        Seq(UnresolvedAttribute("a")),
-        Seq(SortOrder(Literal(1L), Ascending)),
-        SpecifiedWindowFrame(RangeFrame, CurrentRow, UnboundedFollowing))
-    )
-  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
index e770cc3..f062191 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
@@ -262,17 +262,16 @@ class ExpressionParserSuite extends PlanTest {
     // Range/Row
     val frameTypes = Seq(("rows", RowFrame), ("range", RangeFrame))
     val boundaries = Seq(
-      ("10 preceding", -Literal(10), CurrentRow),
-      ("2147483648 preceding", -Literal(2147483648L), CurrentRow),
-      ("3 + 1 following", Add(Literal(3), Literal(1)), CurrentRow),
+      ("10 preceding", ValuePreceding(10), CurrentRow),
+      ("3 + 1 following", ValueFollowing(4), CurrentRow), // Will fail during analysis
       ("unbounded preceding", UnboundedPreceding, CurrentRow),
       ("unbounded following", UnboundedFollowing, CurrentRow), // Will fail during analysis
       ("between unbounded preceding and current row", UnboundedPreceding, CurrentRow),
       ("between unbounded preceding and unbounded following",
         UnboundedPreceding, UnboundedFollowing),
-      ("between 10 preceding and current row", -Literal(10), CurrentRow),
-      ("between current row and 5 following", CurrentRow, Literal(5)),
-      ("between 10 preceding and 5 following", -Literal(10), Literal(5))
+      ("between 10 preceding and current row", ValuePreceding(10), CurrentRow),
+      ("between current row and 5 following", CurrentRow, ValueFollowing(5)),
+      ("between 10 preceding and 5 following", ValuePreceding(10), ValueFollowing(5))
     )
     frameTypes.foreach {
       case (frameTypeSql, frameType) =>
@@ -284,9 +283,13 @@ class ExpressionParserSuite extends PlanTest {
         }
     }
 
+    // We cannot use non integer constants.
+    intercept("foo(*) over (partition by a order by b rows 10.0 preceding)",
+      "Frame bound value must be a constant integer.")
+
     // We cannot use an arbitrary expression.
     intercept("foo(*) over (partition by a order by b rows exp(b) preceding)",
-      "Frame bound value must be a literal.")
+      "Frame bound value must be a constant integer.")
   }
 
   test("row constructor") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 1e43654..950f152 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -243,7 +243,7 @@ class PlanParserSuite extends PlanTest {
     val sql = "select * from t"
     val plan = table("t").select(star())
     val spec = WindowSpecDefinition(Seq('a, 'b), Seq('c.asc),
-      SpecifiedWindowFrame(RowFrame, -Literal(1), Literal(1)))
+      SpecifiedWindowFrame(RowFrame, ValuePreceding(1), ValueFollowing(1)))
 
     // Test window resolution.
     val ws1 = Map("w1" -> spec, "w2" -> spec, "w3" -> spec)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 363c1e2..06ef7bc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -436,22 +436,21 @@ class TreeNodeSuite extends SparkFunSuite {
         "bucketColumnNames" -> "[bucket]",
         "sortColumnNames" -> "[sort]"))
 
+    // Converts FrameBoundary to JSON
+    assertJSON(
+      ValueFollowing(3),
+      JObject(
+        "product-class" -> classOf[ValueFollowing].getName,
+        "value" -> 3))
+
     // Converts WindowFrame to JSON
     assertJSON(
-      SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow),
-      List(
-        JObject(
-          "class" -> classOf[SpecifiedWindowFrame].getName,
-          "num-children" -> 2,
-          "frameType" -> JObject("object" -> JString(RowFrame.getClass.getName)),
-          "lower" -> 0,
-          "upper" -> 1),
-        JObject(
-          "class" -> UnboundedPreceding.getClass.getName,
-          "num-children" -> 0),
-        JObject(
-          "class" -> CurrentRow.getClass.getName,
-          "num-children" -> 0)))
+      SpecifiedWindowFrame(RowFrame, UnboundedFollowing, CurrentRow),
+      JObject(
+        "product-class" -> classOf[SpecifiedWindowFrame].getName,
+        "frameType" -> JObject("object" -> JString(RowFrame.getClass.getName)),
+        "frameStart" -> JObject("object" -> JString(UnboundedFollowing.getClass.getName)),
+        "frameEnd" -> JObject("object" -> JString(CurrentRow.getClass.getName))))
 
     // Converts Partitioning to JSON
     assertJSON(

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 9efbeee..950a679 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.IntegerType
 
 /**
  * This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@@ -108,50 +109,46 @@ case class WindowExec(
    *
    * This method uses Code Generation. It can only be used on the executor side.
    *
-   * @param frame to evaluate. This can either be a Row or Range frame.
-   * @param bound with respect to the row.
+   * @param frameType to evaluate. This can either be Row or Range based.
+   * @param offset with respect to the row.
    * @return a bound ordering object.
    */
-  private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = {
-    (frame, bound) match {
-      case (RowFrame, CurrentRow) =>
-        RowBoundOrdering(0)
-
-      case (RowFrame, IntegerLiteral(offset)) =>
-        RowBoundOrdering(offset)
-
-      case (RangeFrame, CurrentRow) =>
-        val ordering = newOrdering(orderSpec, child.output)
-        RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
-
-      case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
-        // Use only the first order expression when the offset is non-null.
-        val sortExpr = orderSpec.head
-        val expr = sortExpr.child
-
-        // Create the projection which returns the current 'value'.
-        val current = newMutableProjection(expr :: Nil, child.output)
-
-        // Flip the sign of the offset when processing the order is descending
-        val boundOffset = sortExpr.direction match {
-          case Descending => UnaryMinus(offset)
-          case Ascending => offset
+  private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = {
+    frameType match {
+      case RangeFrame =>
+        val (exprs, current, bound) = if (offset == 0) {
+          // Use the entire order expression when the offset is 0.
+          val exprs = orderSpec.map(_.child)
+          val buildProjection = () => newMutableProjection(exprs, child.output)
+          (orderSpec, buildProjection(), buildProjection())
+        } else if (orderSpec.size == 1) {
+          // Use only the first order expression when the offset is non-null.
+          val sortExpr = orderSpec.head
+          val expr = sortExpr.child
+          // Create the projection which returns the current 'value'.
+          val current = newMutableProjection(expr :: Nil, child.output)
+          // Flip the sign of the offset when processing the order is descending
+          val boundOffset = sortExpr.direction match {
+            case Descending => -offset
+            case Ascending => offset
+          }
+          // Create the projection which returns the current 'value' modified by adding the offset.
+          val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
+          val bound = newMutableProjection(boundExpr :: Nil, child.output)
+          (sortExpr :: Nil, current, bound)
+        } else {
+          sys.error("Non-Zero range offsets are not supported for windows " +
+            "with multiple order expressions.")
         }
-
-        // Create the projection which returns the current 'value' modified by adding the offset.
-        val boundExpr = Add(expr, Cast(boundOffset, expr.dataType))
-        val bound = newMutableProjection(boundExpr :: Nil, child.output)
-
         // Construct the ordering. This is used to compare the result of current value projection
         // to the result of bound value projection. This is done manually because we want to use
         // Code Generation (if it is enabled).
-        val boundSortExprs = sortExpr.copy(BoundReference(0, expr.dataType, expr.nullable)) :: Nil
-        val ordering = newOrdering(boundSortExprs, Nil)
+        val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
+          SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
+        }
+        val ordering = newOrdering(sortExprs, Nil)
         RangeBoundOrdering(ordering, current, bound)
-
-      case (RangeFrame, _) =>
-        sys.error("Non-Zero range offsets are not supported for windows " +
-          "with multiple order expressions.")
+      case RowFrame => RowBoundOrdering(offset)
     }
   }
 
@@ -160,13 +157,13 @@ case class WindowExec(
    * WindowExpressions and factory function for the WindowFrameFunction.
    */
   private[this] lazy val windowFrameExpressionFactoryPairs = {
-    type FrameKey = (String, FrameType, Expression, Expression)
+    type FrameKey = (String, FrameType, Option[Int], Option[Int])
     type ExpressionBuffer = mutable.Buffer[Expression]
     val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
 
     // Add a function and its function to the map for a given frame.
     def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
-      val key = (tpe, fr.frameType, fr.lower, fr.upper)
+      val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd))
       val (es, fns) = framedFunctions.getOrElseUpdate(
         key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
       es += e
@@ -206,7 +203,7 @@ case class WindowExec(
         // Create the factory
         val factory = key match {
           // Offset Frame
-          case ("OFFSET", _, IntegerLiteral(offset), _) =>
+          case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
             target: InternalRow =>
               new OffsetWindowFunctionFrame(
                 target,
@@ -218,38 +215,38 @@ case class WindowExec(
                   newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
                 offset)
 
-          // Entire Partition Frame.
-          case ("AGGREGATE", _, UnboundedPreceding, UnboundedFollowing) =>
-            target: InternalRow => {
-              new UnboundedWindowFunctionFrame(target, processor)
-            }
-
           // Growing Frame.
-          case ("AGGREGATE", frameType, UnboundedPreceding, upper) =>
+          case ("AGGREGATE", frameType, None, Some(high)) =>
             target: InternalRow => {
               new UnboundedPrecedingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, high))
             }
 
           // Shrinking Frame.
-          case ("AGGREGATE", frameType, lower, UnboundedFollowing) =>
+          case ("AGGREGATE", frameType, Some(low), None) =>
             target: InternalRow => {
               new UnboundedFollowingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower))
+                createBoundOrdering(frameType, low))
             }
 
           // Moving Frame.
-          case ("AGGREGATE", frameType, lower, upper) =>
+          case ("AGGREGATE", frameType, Some(low), Some(high)) =>
             target: InternalRow => {
               new SlidingWindowFunctionFrame(
                 target,
                 processor,
-                createBoundOrdering(frameType, lower),
-                createBoundOrdering(frameType, upper))
+                createBoundOrdering(frameType, low),
+                createBoundOrdering(frameType, high))
+            }
+
+          // Entire Partition Frame.
+          case ("AGGREGATE", frameType, None, None) =>
+            target: InternalRow => {
+              new UnboundedWindowFunctionFrame(target, processor)
             }
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index e6cb889..6279d48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.expressions
 
 import org.apache.spark.annotation.InterfaceStability
-import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.expressions._
 
 /**
@@ -123,24 +123,7 @@ class WindowSpec private[sql](
    */
   // Note: when updating the doc for this method, also update Window.rowsBetween.
   def rowsBetween(start: Long, end: Long): WindowSpec = {
-    val boundaryStart = start match {
-      case 0 => CurrentRow
-      case Long.MinValue => UnboundedPreceding
-      case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
-      case x => throw new AnalysisException(s"Boundary start is not a valid integer: $x")
-    }
-
-    val boundaryEnd = end match {
-      case 0 => CurrentRow
-      case Long.MaxValue => UnboundedFollowing
-      case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
-      case x => throw new AnalysisException(s"Boundary end is not a valid integer: $x")
-    }
-
-    new WindowSpec(
-      partitionSpec,
-      orderSpec,
-      SpecifiedWindowFrame(RowFrame, boundaryStart, boundaryEnd))
+    between(RowFrame, start, end)
   }
 
   /**
@@ -191,22 +174,28 @@ class WindowSpec private[sql](
    */
   // Note: when updating the doc for this method, also update Window.rangeBetween.
   def rangeBetween(start: Long, end: Long): WindowSpec = {
+    between(RangeFrame, start, end)
+  }
+
+  private def between(typ: FrameType, start: Long, end: Long): WindowSpec = {
     val boundaryStart = start match {
       case 0 => CurrentRow
       case Long.MinValue => UnboundedPreceding
-      case x => Literal(x)
+      case x if x < 0 => ValuePreceding(-start.toInt)
+      case x if x > 0 => ValueFollowing(start.toInt)
     }
 
     val boundaryEnd = end match {
       case 0 => CurrentRow
       case Long.MaxValue => UnboundedFollowing
-      case x => Literal(x)
+      case x if x < 0 => ValuePreceding(-end.toInt)
+      case x if x > 0 => ValueFollowing(end.toInt)
     }
 
     new WindowSpec(
       partitionSpec,
       orderSpec,
-      SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd))
+      SpecifiedWindowFrame(typ, boundaryStart, boundaryEnd))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/resources/sql-tests/inputs/window.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index 342e571..c800fc3 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -1,44 +1,24 @@
 -- Test data.
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate);
+(null, "a"), (1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"), (null, null), (3, null)
+AS testData(val, cate);
 
 -- RowsBetween
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData
 ORDER BY cate, val;
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
 ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
 
 -- RangeBetween
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData
 ORDER BY cate, val;
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
 
 -- RangeBetween with reverse OrderBy
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
 
--- Invalid window frame
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val;
-
-
 -- Window functions
 SELECT val, cate,
 max(val) OVER w AS max,

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/resources/sql-tests/results/window.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index 9751106..aa58561 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,12 +1,11 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 19
+-- Number of queries: 11
 
 
 -- !query 0
 CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate)
+(null, "a"), (1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"), (null, null), (3, null)
+AS testData(val, cate)
 -- !query 0 schema
 struct<>
 -- !query 0 output
@@ -48,21 +47,11 @@ NULL	a	1
 
 
 -- !query 3
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
--- !query 3 schema
-struct<>
--- !query 3 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN CURRENT ROW AND 2147483648L FOLLOWING' due to data type mismatch: The data type of the upper bound 'LongType does not match the expected data type 'IntegerType'.; line 1 pos 41
-
-
--- !query 4
 SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData
 ORDER BY cate, val
--- !query 4 schema
+-- !query 3 schema
 struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint>
--- !query 4 output
+-- !query 3 output
 NULL	NULL	0
 3	NULL	1
 NULL	a	0
@@ -74,12 +63,12 @@ NULL	a	0
 3	b	2
 
 
--- !query 5
+-- !query 4
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 5 schema
+-- !query 4 schema
 struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 5 output
+-- !query 4 output
 NULL	NULL	NULL
 3	NULL	3
 NULL	a	NULL
@@ -91,29 +80,12 @@ NULL	a	NULL
 3	b	3
 
 
--- !query 6
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
--- !query 6 schema
-struct<val_long:bigint,cate:string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint>
--- !query 6 output
-NULL	NULL	NULL
-1	NULL	1
-1	a	4
-1	a	4
-2	a	2147483652
-2147483650	a	2147483650
-NULL	b	NULL
-3	b	2147483653
-2147483650	b	2147483650
-
-
--- !query 7
+-- !query 5
 SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
 RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 7 schema
+-- !query 5 schema
 struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 7 output
+-- !query 5 output
 NULL	NULL	NULL
 3	NULL	3
 NULL	a	NULL
@@ -125,73 +97,7 @@ NULL	a	NULL
 3	b	5
 
 
--- !query 8
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 8 schema
-struct<>
--- !query 8 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
-
-
--- !query 9
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 9 schema
-struct<>
--- !query 9 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33
-
-
--- !query 10
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 10 schema
-struct<>
--- !query 10 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
-
-
--- !query 11
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 11 schema
-struct<>
--- !query 11 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'DateType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33
-
-
--- !query 12
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
--- !query 12 schema
-struct<>
--- !query 12 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
-
-
--- !query 13
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
--- !query 13 schema
-struct<>
--- !query 13 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Frame bound value must be a literal.(line 2, pos 30)
-
-== SQL ==
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
-------------------------------^^^
-
-
--- !query 14
+-- !query 6
 SELECT val, cate,
 max(val) OVER w AS max,
 min(val) OVER w AS min,
@@ -218,9 +124,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct
 FROM testData
 WINDOW w AS (PARTITION BY cate ORDER BY val)
 ORDER BY cate, val
--- !query 14 schema
+-- !query 6 schema
 struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint>
--- !query 14 output
+-- !query 6 output
 NULL	NULL	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	1	1	0.5	0.0	1	1	NULL	NULL	0
 3	NULL	3	3	3	1	3	3.0	NaN	NULL	3	NULL	3	3	3	2	2	1.0	1.0	2	2	0.0	NaN	1
 NULL	a	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	1	1	0.25	0.0	1	1	NULL	NULL	0
@@ -232,11 +138,11 @@ NULL	a	NULL	NULL	NULL	0	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	1	1	0.25	0.
 3	b	3	1	1	3	6	2.0	1.0	1	1	1	3	3	3	3	3	1.0	1.0	2	3	0.6666666666666666	1.0	3
 
 
--- !query 15
+-- !query 7
 SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val
--- !query 15 schema
+-- !query 7 schema
 struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
--- !query 15 output
+-- !query 7 output
 NULL	NULL	NULL
 3	NULL	NULL
 NULL	a	NULL
@@ -248,20 +154,20 @@ NULL	a	NULL
 3	b	NULL
 
 
--- !query 16
+-- !query 8
 SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
--- !query 16 schema
+-- !query 8 schema
 struct<>
--- !query 16 output
+-- !query 8 output
 org.apache.spark.sql.AnalysisException
 Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
 
 
--- !query 17
+-- !query 9
 SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
--- !query 17 schema
+-- !query 9 schema
 struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
--- !query 17 output
+-- !query 9 output
 NULL	NULL	13	1.8571428571428572
 3	NULL	13	1.8571428571428572
 NULL	a	13	1.8571428571428572
@@ -273,7 +179,7 @@ NULL	a	13	1.8571428571428572
 3	b	13	1.8571428571428572
 
 
--- !query 18
+-- !query 10
 SELECT val, cate,
 first_value(false) OVER w AS first_value,
 first_value(true, true) OVER w AS first_value_ignore_null,
@@ -284,9 +190,9 @@ last_value(false, false) OVER w AS last_value_contain_null
 FROM testData
 WINDOW w AS ()
 ORDER BY cate, val
--- !query 18 schema
+-- !query 10 schema
 struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
--- !query 18 output
+-- !query 10 output
 NULL	NULL	false	true	false	false	true	false
 3	NULL	false	true	false	false	true	false
 NULL	a	false	true	false	false	true	false

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9806e57..204858f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -151,48 +151,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
         Row(2.0d), Row(2.0d)))
   }
 
-  test("row between should accept integer values as boundary") {
-    val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
-      (3L, "2"), (2L, "1"), (2147483650L, "2"))
-      .toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        count("key").over(
-          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 2147483647))),
-      Seq(Row(1, 3), Row(1, 4), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))
-    )
-
-    val e = intercept[AnalysisException](
-      df.select(
-        $"key",
-        count("key").over(
-          Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 2147483648L))))
-    assert(e.message.contains("Boundary end is not a valid integer: 2147483648"))
-  }
-
-  test("range between should accept integer/long values as boundary") {
-    val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
-      (3L, "2"), (2L, "1"), (2147483650L, "2"))
-      .toDF("key", "value")
-    df.createOrReplaceTempView("window_table")
-    checkAnswer(
-      df.select(
-        $"key",
-        count("key").over(
-          Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))),
-      Seq(Row(1, 3), Row(1, 3), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))
-    )
-    checkAnswer(
-      df.select(
-        $"key",
-        count("key").over(
-          Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
-      Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))
-    )
-  }
-
   test("aggregation and rows between with unbounded") {
     val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
     df.createOrReplaceTempView("window_table")

http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
index 90f9059..149ce1e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
@@ -98,27 +98,27 @@ class ExpressionSQLBuilderSuite extends SQLBuilderTest {
 
     checkSQL(
       WindowSpecDefinition('a.int :: Nil, Nil, frame),
-      s"(PARTITION BY `a` ${frame.sql})"
+      s"(PARTITION BY `a` $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition('a.int :: 'b.string :: Nil, Nil, frame),
-      s"(PARTITION BY `a`, `b` ${frame.sql})"
+      s"(PARTITION BY `a`, `b` $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition(Nil, 'a.int.asc :: Nil, frame),
-      s"(ORDER BY `a` ASC NULLS FIRST ${frame.sql})"
+      s"(ORDER BY `a` ASC NULLS FIRST $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition(Nil, 'a.int.asc :: 'b.string.desc :: Nil, frame),
-      s"(ORDER BY `a` ASC NULLS FIRST, `b` DESC NULLS LAST ${frame.sql})"
+      s"(ORDER BY `a` ASC NULLS FIRST, `b` DESC NULLS LAST $frame)"
     )
 
     checkSQL(
       WindowSpecDefinition('a.int :: 'b.string :: Nil, 'c.int.asc :: 'd.string.desc :: Nil, frame),
-      s"(PARTITION BY `a`, `b` ORDER BY `c` ASC NULLS FIRST, `d` DESC NULLS LAST ${frame.sql})"
+      s"(PARTITION BY `a`, `b` ORDER BY `c` ASC NULLS FIRST, `d` DESC NULLS LAST $frame)"
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org