You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/30 03:36:04 UTC
spark git commit: Revert "[SPARK-19451][SQL] rangeBetween method
should accept Long value as boundary"
Repository: spark
Updated Branches:
refs/heads/branch-2.2 66fa6bd6d -> e2062b9c1
Revert "[SPARK-19451][SQL] rangeBetween method should accept Long value as boundary"
This reverts commit 66fa6bd6d48b08625ecedfcb5a976678141300bd.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2062b9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2062b9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2062b9c
Branch: refs/heads/branch-2.2
Commit: e2062b9c1106433799d2874dfe17e181fe1ecb5e
Parents: 66fa6bd
Author: gatorsmile <ga...@gmail.com>
Authored: Sat Jul 29 20:35:22 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Jul 29 20:35:22 2017 -0700
----------------------------------------------------------------------
.../sql/catalyst/analysis/CheckAnalysis.scala | 15 +-
.../sql/catalyst/analysis/TypeCoercion.scala | 23 --
.../sql/catalyst/expressions/package.scala | 7 -
.../expressions/windowExpressions.scala | 328 +++++++++----------
.../spark/sql/catalyst/parser/AstBuilder.scala | 20 +-
.../spark/sql/catalyst/trees/TreeNode.scala | 2 +
.../catalyst/analysis/AnalysisErrorSuite.scala | 2 +-
.../catalyst/analysis/TypeCoercionSuite.scala | 36 --
.../catalyst/parser/ExpressionParserSuite.scala | 17 +-
.../sql/catalyst/parser/PlanParserSuite.scala | 2 +-
.../sql/catalyst/trees/TreeNodeSuite.scala | 27 +-
.../spark/sql/execution/window/WindowExec.scala | 103 +++---
.../spark/sql/expressions/WindowSpec.scala | 33 +-
.../test/resources/sql-tests/inputs/window.sql | 24 +-
.../resources/sql-tests/results/window.sql.out | 146 ++-------
.../sql/DataFrameWindowFunctionsSuite.scala | 42 ---
.../catalyst/ExpressionSQLBuilderSuite.scala | 10 +-
17 files changed, 304 insertions(+), 533 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 2e300c5..2e3ac3e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -106,9 +106,11 @@ trait CheckAnalysis extends PredicateHelper {
case w @ WindowExpression(AggregateExpression(_, _, true, _), _) =>
failAnalysis(s"Distinct window functions are not supported: $w")
- case w @ WindowExpression(_: OffsetWindowFunction,
- WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
- if order.isEmpty || !frame.isOffset =>
+ case w @ WindowExpression(_: OffsetWindowFunction, WindowSpecDefinition(_, order,
+ SpecifiedWindowFrame(frame,
+ FrameBoundary(l),
+ FrameBoundary(h))))
+ if order.isEmpty || frame != RowFrame || l != h =>
failAnalysis("An offset window function can only be evaluated in an ordered " +
s"row-based window frame with a single offset: $w")
@@ -117,10 +119,15 @@ trait CheckAnalysis extends PredicateHelper {
// function.
e match {
case _: AggregateExpression | _: OffsetWindowFunction | _: AggregateWindowFunction =>
- w
case _ =>
failAnalysis(s"Expression '$e' not supported within a window function.")
}
+ // Make sure the window specification is valid.
+ s.validate match {
+ case Some(m) =>
+ failAnalysis(s"Window specification $s is not valid because $m")
+ case None => w
+ }
case s @ ScalarSubquery(query, conditions, _) =>
checkAnalysis(query)
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index a2fcad9..e1dd010 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -58,7 +58,6 @@ object TypeCoercion {
PropagateTypes ::
ImplicitTypeCasts ::
DateTimeOperations ::
- WindowFrameCoercion ::
Nil
// See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types.
@@ -786,26 +785,4 @@ object TypeCoercion {
Option(ret)
}
}
-
- /**
- * Cast WindowFrame boundaries to the type they operate upon.
- */
- object WindowFrameCoercion extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions {
- case s @ WindowSpecDefinition(_, Seq(order), SpecifiedWindowFrame(RangeFrame, lower, upper))
- if order.resolved =>
- s.copy(frameSpecification = SpecifiedWindowFrame(
- RangeFrame,
- createBoundaryCast(lower, order.dataType),
- createBoundaryCast(upper, order.dataType)))
- }
-
- private def createBoundaryCast(boundary: Expression, dt: DataType): Expression = {
- boundary match {
- case e: SpecialFrameBoundary => e
- case e: Expression if e.dataType != dt && Cast.canCast(e.dataType, dt) => Cast(e, dt)
- case _ => boundary
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 1a48995..4c8b177 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -75,13 +75,6 @@ package object expressions {
}
/**
- * An identity projection. This returns the input row.
- */
- object IdentityProjection extends Projection {
- override def apply(row: InternalRow): InternalRow = row
- }
-
- /**
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
* column of the new row. If the schema of the input row is specified, then the given expression
* will be bound to that schema.
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index 62c4832..3719042 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.util.Locale
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, NoOp}
@@ -42,7 +43,34 @@ case class WindowSpecDefinition(
orderSpec: Seq[SortOrder],
frameSpecification: WindowFrame) extends Expression with WindowSpec with Unevaluable {
- override def children: Seq[Expression] = partitionSpec ++ orderSpec :+ frameSpecification
+ def validate: Option[String] = frameSpecification match {
+ case UnspecifiedFrame =>
+ Some("Found a UnspecifiedFrame. It should be converted to a SpecifiedWindowFrame " +
+ "during analysis. Please file a bug report.")
+ case frame: SpecifiedWindowFrame => frame.validate.orElse {
+ def checkValueBasedBoundaryForRangeFrame(): Option[String] = {
+ if (orderSpec.length > 1) {
+ // It is not allowed to have a value-based PRECEDING and FOLLOWING
+ // as the boundary of a Range Window Frame.
+ Some("This Range Window Frame only accepts at most one ORDER BY expression.")
+ } else if (orderSpec.nonEmpty && !orderSpec.head.dataType.isInstanceOf[NumericType]) {
+ Some("The data type of the expression in the ORDER BY clause should be a numeric type.")
+ } else {
+ None
+ }
+ }
+
+ (frame.frameType, frame.frameStart, frame.frameEnd) match {
+ case (RangeFrame, vp: ValuePreceding, _) => checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, vf: ValueFollowing, _) => checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, _, vp: ValuePreceding) => checkValueBasedBoundaryForRangeFrame()
+ case (RangeFrame, _, vf: ValueFollowing) => checkValueBasedBoundaryForRangeFrame()
+ case (_, _, _) => None
+ }
+ }
+ }
+
+ override def children: Seq[Expression] = partitionSpec ++ orderSpec
override lazy val resolved: Boolean =
childrenResolved && checkInputDataTypes().isSuccess &&
@@ -50,46 +78,23 @@ case class WindowSpecDefinition(
override def nullable: Boolean = true
override def foldable: Boolean = false
- override def dataType: DataType = throw new UnsupportedOperationException("dataType")
+ override def dataType: DataType = throw new UnsupportedOperationException
- override def checkInputDataTypes(): TypeCheckResult = {
- frameSpecification match {
- case UnspecifiedFrame =>
- TypeCheckFailure(
- "Cannot use an UnspecifiedFrame. This should have been converted during analysis. " +
- "Please file a bug report.")
- case f: SpecifiedWindowFrame if f.frameType == RangeFrame && !f.isUnbounded &&
- orderSpec.isEmpty =>
- TypeCheckFailure(
- "A range window frame cannot be used in an unordered window specification.")
- case f: SpecifiedWindowFrame if f.frameType == RangeFrame && f.isValueBound &&
- orderSpec.size > 1 =>
- TypeCheckFailure(
- s"A range window frame with value boundaries cannot be used in a window specification " +
- s"with multiple order by expressions: ${orderSpec.mkString(",")}")
- case f: SpecifiedWindowFrame if f.frameType == RangeFrame && f.isValueBound &&
- !isValidFrameType(f.valueBoundary.head.dataType) =>
- TypeCheckFailure(
- s"The data type '${orderSpec.head.dataType}' used in the order specification does " +
- s"not match the data type '${f.valueBoundary.head.dataType}' which is used in the " +
- "range frame.")
- case _ => TypeCheckSuccess
+ override def sql: String = {
+ val partition = if (partitionSpec.isEmpty) {
+ ""
+ } else {
+ "PARTITION BY " + partitionSpec.map(_.sql).mkString(", ") + " "
}
- }
- override def sql: String = {
- def toSql(exprs: Seq[Expression], prefix: String): Seq[String] = {
- Seq(exprs).filter(_.nonEmpty).map(_.map(_.sql).mkString(prefix, ", ", ""))
+ val order = if (orderSpec.isEmpty) {
+ ""
+ } else {
+ "ORDER BY " + orderSpec.map(_.sql).mkString(", ") + " "
}
- val elements =
- toSql(partitionSpec, "PARTITION BY ") ++
- toSql(orderSpec, "ORDER BY ") ++
- Seq(frameSpecification.sql)
- elements.mkString("(", " ", ")")
+ s"($partition$order${frameSpecification.toString})"
}
-
- private def isValidFrameType(ft: DataType): Boolean = orderSpec.head.dataType == ft
}
/**
@@ -101,26 +106,22 @@ case class WindowSpecReference(name: String) extends WindowSpec
/**
* The trait used to represent the type of a Window Frame.
*/
-sealed trait FrameType {
- def inputType: AbstractDataType
- def sql: String
-}
+sealed trait FrameType
/**
- * RowFrame treats rows in a partition individually. Values used in a row frame are considered
- * to be physical offsets.
+ * RowFrame treats rows in a partition individually. When a [[ValuePreceding]]
+ * or a [[ValueFollowing]] is used as its [[FrameBoundary]], the value is considered
+ * as a physical offset.
* For example, `ROW BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a 3-row frame,
* from the row precedes the current row to the row follows the current row.
*/
-case object RowFrame extends FrameType {
- override def inputType: AbstractDataType = IntegerType
- override def sql: String = "ROWS"
-}
+case object RowFrame extends FrameType
/**
- * RangeFrame treats rows in a partition as groups of peers. All rows having the same `ORDER BY`
- * ordering are considered as peers. Values used in a range frame are considered to be logical
- * offsets.
+ * RangeFrame treats rows in a partition as groups of peers.
+ * All rows having the same `ORDER BY` ordering are considered as peers.
+ * When a [[ValuePreceding]] or a [[ValueFollowing]] is used as its [[FrameBoundary]],
+ * the value is considered as a logical offset.
* For example, assuming the value of the current row's `ORDER BY` expression `expr` is `v`,
* `RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING` represents a frame containing rows whose values
* `expr` are in the range of [v-1, v+1].
@@ -128,144 +129,138 @@ case object RowFrame extends FrameType {
* If `ORDER BY` clause is not defined, all rows in the partition is considered as peers
* of the current row.
*/
-case object RangeFrame extends FrameType {
- override def inputType: AbstractDataType = NumericType
- override def sql: String = "RANGE"
-}
+case object RangeFrame extends FrameType
/**
- * The trait used to represent special boundaries used in a window frame.
+ * The trait used to represent the type of a Window Frame Boundary.
*/
-sealed trait SpecialFrameBoundary extends Expression with Unevaluable {
- override def children: Seq[Expression] = Nil
- override def dataType: DataType = NullType
- override def foldable: Boolean = false
- override def nullable: Boolean = false
-}
-
-/** UNBOUNDED boundary. */
-case object UnboundedPreceding extends SpecialFrameBoundary {
- override def sql: String = "UNBOUNDED PRECEDING"
-}
-
-case object UnboundedFollowing extends SpecialFrameBoundary {
- override def sql: String = "UNBOUNDED FOLLOWING"
-}
-
-/** CURRENT ROW boundary. */
-case object CurrentRow extends SpecialFrameBoundary {
- override def sql: String = "CURRENT ROW"
+sealed trait FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean
}
/**
- * The trait used to represent the a Window Frame.
+ * Extractor for making working with frame boundaries easier.
*/
-sealed trait WindowFrame extends Expression with Unevaluable {
- override def children: Seq[Expression] = Nil
- override def dataType: DataType = throw new UnsupportedOperationException("dataType")
- override def foldable: Boolean = false
- override def nullable: Boolean = false
+object FrameBoundary {
+ def apply(boundary: FrameBoundary): Option[Int] = unapply(boundary)
+ def unapply(boundary: FrameBoundary): Option[Int] = boundary match {
+ case CurrentRow => Some(0)
+ case ValuePreceding(offset) => Some(-offset)
+ case ValueFollowing(offset) => Some(offset)
+ case _ => None
+ }
}
-/** Used as a place holder when a frame specification is not defined. */
-case object UnspecifiedFrame extends WindowFrame
-
-/**
- * A specified Window Frame. The val lower/uppper can be either a foldable [[Expression]] or a
- * [[SpecialFrameBoundary]].
- */
-case class SpecifiedWindowFrame(
- frameType: FrameType,
- lower: Expression,
- upper: Expression)
- extends WindowFrame {
-
- override def children: Seq[Expression] = lower :: upper :: Nil
+/** UNBOUNDED PRECEDING boundary. */
+case object UnboundedPreceding extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => true
+ case vp: ValuePreceding => true
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
+ }
- lazy val valueBoundary: Seq[Expression] =
- children.filterNot(_.isInstanceOf[SpecialFrameBoundary])
+ override def toString: String = "UNBOUNDED PRECEDING"
+}
- override def checkInputDataTypes(): TypeCheckResult = {
- // Check lower value.
- val lowerCheck = checkBoundary(lower, "lower")
- if (lowerCheck.isFailure) {
- return lowerCheck
- }
+/** <value> PRECEDING boundary. */
+case class ValuePreceding(value: Int) extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case ValuePreceding(anotherValue) => value >= anotherValue
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
+ }
- // Check upper value.
- val upperCheck = checkBoundary(upper, "upper")
- if (upperCheck.isFailure) {
- return upperCheck
- }
+ override def toString: String = s"$value PRECEDING"
+}
- // Check combination (of expressions).
- (lower, upper) match {
- case (l: Expression, u: Expression) if !isValidFrameBoundary(l, u) =>
- TypeCheckFailure(s"Window frame upper bound '$upper' does not followes the lower bound " +
- s"'$lower'.")
- case (l: SpecialFrameBoundary, _) => TypeCheckSuccess
- case (_, u: SpecialFrameBoundary) => TypeCheckSuccess
- case (l: Expression, u: Expression) if l.dataType != u.dataType =>
- TypeCheckFailure(
- s"Window frame bounds '$lower' and '$upper' do no not have the same data type: " +
- s"'${l.dataType.catalogString}' <> '${u.dataType.catalogString}'")
- case (l: Expression, u: Expression) if isGreaterThan(l, u) =>
- TypeCheckFailure(
- "The lower bound of a window frame must be less than or equal to the upper bound")
- case _ => TypeCheckSuccess
- }
+/** CURRENT ROW boundary. */
+case object CurrentRow extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => true
+ case vf: ValueFollowing => true
+ case UnboundedFollowing => true
}
- override def sql: String = {
- val lowerSql = boundarySql(lower)
- val upperSql = boundarySql(upper)
- s"${frameType.sql} BETWEEN $lowerSql AND $upperSql"
- }
+ override def toString: String = "CURRENT ROW"
+}
- def isUnbounded: Boolean = lower == UnboundedPreceding && upper == UnboundedFollowing
+/** <value> FOLLOWING boundary. */
+case class ValueFollowing(value: Int) extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => false
+ case ValueFollowing(anotherValue) => value <= anotherValue
+ case UnboundedFollowing => true
+ }
- def isValueBound: Boolean = valueBoundary.nonEmpty
+ override def toString: String = s"$value FOLLOWING"
+}
- def isOffset: Boolean = (lower, upper) match {
- case (l: Expression, u: Expression) => frameType == RowFrame && l == u
- case _ => false
+/** UNBOUNDED FOLLOWING boundary. */
+case object UnboundedFollowing extends FrameBoundary {
+ def notFollows(other: FrameBoundary): Boolean = other match {
+ case UnboundedPreceding => false
+ case vp: ValuePreceding => false
+ case CurrentRow => false
+ case vf: ValueFollowing => false
+ case UnboundedFollowing => true
}
- private def boundarySql(expr: Expression): String = expr match {
- case e: SpecialFrameBoundary => e.sql
- case UnaryMinus(n) => n.sql + " PRECEDING"
- case e: Expression => e.sql + " FOLLOWING"
- }
+ override def toString: String = "UNBOUNDED FOLLOWING"
+}
- private def isGreaterThan(l: Expression, r: Expression): Boolean = {
- GreaterThan(l, r).eval().asInstanceOf[Boolean]
- }
+/**
+ * The trait used to represent the a Window Frame.
+ */
+sealed trait WindowFrame
+
+/** Used as a place holder when a frame specification is not defined. */
+case object UnspecifiedFrame extends WindowFrame
- private def checkBoundary(b: Expression, location: String): TypeCheckResult = b match {
- case _: SpecialFrameBoundary => TypeCheckSuccess
- case e: Expression if !e.foldable =>
- TypeCheckFailure(s"Window frame $location bound '$e' is not a literal.")
- case e: Expression if !frameType.inputType.acceptsType(e.dataType) =>
- TypeCheckFailure(
- s"The data type of the $location bound '${e.dataType} does not match " +
- s"the expected data type '${frameType.inputType}'.")
- case _ => TypeCheckSuccess
+/** A specified Window Frame. */
+case class SpecifiedWindowFrame(
+ frameType: FrameType,
+ frameStart: FrameBoundary,
+ frameEnd: FrameBoundary) extends WindowFrame {
+
+ /** If this WindowFrame is valid or not. */
+ def validate: Option[String] = (frameType, frameStart, frameEnd) match {
+ case (_, UnboundedFollowing, _) =>
+ Some(s"$UnboundedFollowing is not allowed as the start of a Window Frame.")
+ case (_, _, UnboundedPreceding) =>
+ Some(s"$UnboundedPreceding is not allowed as the end of a Window Frame.")
+ // case (RowFrame, start, end) => ??? RowFrame specific rule
+ // case (RangeFrame, start, end) => ??? RangeFrame specific rule
+ case (_, start, end) =>
+ if (start.notFollows(end)) {
+ None
+ } else {
+ val reason =
+ s"The end of this Window Frame $end is smaller than the start of " +
+ s"this Window Frame $start."
+ Some(reason)
+ }
}
- private def isValidFrameBoundary(l: Expression, u: Expression): Boolean = {
- (l, u) match {
- case (UnboundedFollowing, _) => false
- case (_, UnboundedPreceding) => false
- case _ => true
- }
+ override def toString: String = frameType match {
+ case RowFrame => s"ROWS BETWEEN $frameStart AND $frameEnd"
+ case RangeFrame => s"RANGE BETWEEN $frameStart AND $frameEnd"
}
}
object SpecifiedWindowFrame {
/**
+ *
* @param hasOrderSpecification If the window spec has order by expressions.
* @param acceptWindowFrame If the window function accepts user-specified frame.
- * @return the default window frame.
+ * @return
*/
def defaultWindowFrame(
hasOrderSpecification: Boolean,
@@ -356,25 +351,20 @@ abstract class OffsetWindowFunction
override def nullable: Boolean = default == null || default.nullable || input.nullable
- override lazy val frame: WindowFrame = {
+ override lazy val frame = {
+ // This will be triggered by the Analyzer.
+ val offsetValue = offset.eval() match {
+ case o: Int => o
+ case x => throw new AnalysisException(
+ s"Offset expression must be a foldable integer expression: $x")
+ }
val boundary = direction match {
- case Ascending => offset
- case Descending => UnaryMinus(offset)
+ case Ascending => ValueFollowing(offsetValue)
+ case Descending => ValuePreceding(offsetValue)
}
SpecifiedWindowFrame(RowFrame, boundary, boundary)
}
- override def checkInputDataTypes(): TypeCheckResult = {
- val check = super.checkInputDataTypes()
- if (check.isFailure) {
- check
- } else if (!offset.foldable) {
- TypeCheckFailure(s"Offset expression '$offset' must be a literal.")
- } else {
- TypeCheckSuccess
- }
- }
-
override def dataType: DataType = input.dataType
override def inputTypes: Seq[AbstractDataType] =
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8ea18fa..d1c9332 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1142,26 +1142,32 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
/**
- * Create or resolve a frame boundary expressions.
+ * Create or resolve a [[FrameBoundary]]. Simple math expressions are allowed for Value
+ * Preceding/Following boundaries. These expressions must be constant (foldable) and return an
+ * integer value.
*/
- override def visitFrameBound(ctx: FrameBoundContext): Expression = withOrigin(ctx) {
- def value: Expression = {
+ override def visitFrameBound(ctx: FrameBoundContext): FrameBoundary = withOrigin(ctx) {
+ // We currently only allow foldable integers.
+ def value: Int = {
val e = expression(ctx.expression)
- validate(e.resolved && e.foldable, "Frame bound value must be a literal.", ctx)
- e
+ validate(e.resolved && e.foldable && e.dataType == IntegerType,
+ "Frame bound value must be a constant integer.",
+ ctx)
+ e.eval().asInstanceOf[Int]
}
+ // Create the FrameBoundary
ctx.boundType.getType match {
case SqlBaseParser.PRECEDING if ctx.UNBOUNDED != null =>
UnboundedPreceding
case SqlBaseParser.PRECEDING =>
- UnaryMinus(value)
+ ValuePreceding(value)
case SqlBaseParser.CURRENT =>
CurrentRow
case SqlBaseParser.FOLLOWING if ctx.UNBOUNDED != null =>
UnboundedFollowing
case SqlBaseParser.FOLLOWING =>
- value
+ ValueFollowing(value)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 6fb5a99..ae5c513 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -688,6 +688,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case id: FunctionIdentifier => true
case spec: BucketSpec => true
case catalog: CatalogTable => true
+ case boundary: FrameBoundary => true
+ case frame: WindowFrame => true
case partition: Partitioning => true
case resource: FunctionResource => true
case broadcast: BroadcastMode => true
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index adc5032..5050318 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -191,7 +191,7 @@ class AnalysisErrorSuite extends AnalysisTest {
WindowSpecDefinition(
UnresolvedAttribute("a") :: Nil,
SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil,
- SpecifiedWindowFrame(RangeFrame, Literal(1), Literal(2)))).as('window)),
+ SpecifiedWindowFrame(RangeFrame, ValueFollowing(1), ValueFollowing(2)))).as('window)),
"window frame" :: "must match the required frame" :: Nil)
errorTest(
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 345ec0f..2624f558 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -998,42 +998,6 @@ class TypeCoercionSuite extends PlanTest {
EqualTo(Literal(Array(1, 2)), Literal("123")),
EqualTo(Literal(Array(1, 2)), Literal("123")))
}
-
- test("cast WindowFrame boundaries to the type they operate upon") {
- // Can cast frame boundaries to order dataType.
- ruleTest(WindowFrameCoercion,
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal(1L), Ascending)),
- SpecifiedWindowFrame(RangeFrame, Literal(3), Literal(2147483648L))),
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal(1L), Ascending)),
- SpecifiedWindowFrame(RangeFrame, Cast(3, LongType), Literal(2147483648L)))
- )
- // Cannot cast frame boundaries to order dataType.
- ruleTest(WindowFrameCoercion,
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal.default(DateType), Ascending)),
- SpecifiedWindowFrame(RangeFrame, Literal(10.0), Literal(2147483648L))),
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal.default(DateType), Ascending)),
- SpecifiedWindowFrame(RangeFrame, Literal(10.0), Literal(2147483648L)))
- )
- // Should not cast SpecialFrameBoundary.
- ruleTest(WindowFrameCoercion,
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal(1L), Ascending)),
- SpecifiedWindowFrame(RangeFrame, CurrentRow, UnboundedFollowing)),
- windowSpec(
- Seq(UnresolvedAttribute("a")),
- Seq(SortOrder(Literal(1L), Ascending)),
- SpecifiedWindowFrame(RangeFrame, CurrentRow, UnboundedFollowing))
- )
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
index e770cc3..f062191 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala
@@ -262,17 +262,16 @@ class ExpressionParserSuite extends PlanTest {
// Range/Row
val frameTypes = Seq(("rows", RowFrame), ("range", RangeFrame))
val boundaries = Seq(
- ("10 preceding", -Literal(10), CurrentRow),
- ("2147483648 preceding", -Literal(2147483648L), CurrentRow),
- ("3 + 1 following", Add(Literal(3), Literal(1)), CurrentRow),
+ ("10 preceding", ValuePreceding(10), CurrentRow),
+ ("3 + 1 following", ValueFollowing(4), CurrentRow), // Will fail during analysis
("unbounded preceding", UnboundedPreceding, CurrentRow),
("unbounded following", UnboundedFollowing, CurrentRow), // Will fail during analysis
("between unbounded preceding and current row", UnboundedPreceding, CurrentRow),
("between unbounded preceding and unbounded following",
UnboundedPreceding, UnboundedFollowing),
- ("between 10 preceding and current row", -Literal(10), CurrentRow),
- ("between current row and 5 following", CurrentRow, Literal(5)),
- ("between 10 preceding and 5 following", -Literal(10), Literal(5))
+ ("between 10 preceding and current row", ValuePreceding(10), CurrentRow),
+ ("between current row and 5 following", CurrentRow, ValueFollowing(5)),
+ ("between 10 preceding and 5 following", ValuePreceding(10), ValueFollowing(5))
)
frameTypes.foreach {
case (frameTypeSql, frameType) =>
@@ -284,9 +283,13 @@ class ExpressionParserSuite extends PlanTest {
}
}
+ // We cannot use non integer constants.
+ intercept("foo(*) over (partition by a order by b rows 10.0 preceding)",
+ "Frame bound value must be a constant integer.")
+
// We cannot use an arbitrary expression.
intercept("foo(*) over (partition by a order by b rows exp(b) preceding)",
- "Frame bound value must be a literal.")
+ "Frame bound value must be a constant integer.")
}
test("row constructor") {
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 1e43654..950f152 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -243,7 +243,7 @@ class PlanParserSuite extends PlanTest {
val sql = "select * from t"
val plan = table("t").select(star())
val spec = WindowSpecDefinition(Seq('a, 'b), Seq('c.asc),
- SpecifiedWindowFrame(RowFrame, -Literal(1), Literal(1)))
+ SpecifiedWindowFrame(RowFrame, ValuePreceding(1), ValueFollowing(1)))
// Test window resolution.
val ws1 = Map("w1" -> spec, "w2" -> spec, "w3" -> spec)
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index 363c1e2..06ef7bc 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -436,22 +436,21 @@ class TreeNodeSuite extends SparkFunSuite {
"bucketColumnNames" -> "[bucket]",
"sortColumnNames" -> "[sort]"))
+ // Converts FrameBoundary to JSON
+ assertJSON(
+ ValueFollowing(3),
+ JObject(
+ "product-class" -> classOf[ValueFollowing].getName,
+ "value" -> 3))
+
// Converts WindowFrame to JSON
assertJSON(
- SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow),
- List(
- JObject(
- "class" -> classOf[SpecifiedWindowFrame].getName,
- "num-children" -> 2,
- "frameType" -> JObject("object" -> JString(RowFrame.getClass.getName)),
- "lower" -> 0,
- "upper" -> 1),
- JObject(
- "class" -> UnboundedPreceding.getClass.getName,
- "num-children" -> 0),
- JObject(
- "class" -> CurrentRow.getClass.getName,
- "num-children" -> 0)))
+ SpecifiedWindowFrame(RowFrame, UnboundedFollowing, CurrentRow),
+ JObject(
+ "product-class" -> classOf[SpecifiedWindowFrame].getName,
+ "frameType" -> JObject("object" -> JString(RowFrame.getClass.getName)),
+ "frameStart" -> JObject("object" -> JString(UnboundedFollowing.getClass.getName)),
+ "frameEnd" -> JObject("object" -> JString(CurrentRow.getClass.getName))))
// Converts Partitioning to JSON
assertJSON(
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 9efbeee..950a679 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.IntegerType
/**
* This class calculates and outputs (windowed) aggregates over the rows in a single (sorted)
@@ -108,50 +109,46 @@ case class WindowExec(
*
* This method uses Code Generation. It can only be used on the executor side.
*
- * @param frame to evaluate. This can either be a Row or Range frame.
- * @param bound with respect to the row.
+ * @param frameType to evaluate. This can either be Row or Range based.
+ * @param offset with respect to the row.
* @return a bound ordering object.
*/
- private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = {
- (frame, bound) match {
- case (RowFrame, CurrentRow) =>
- RowBoundOrdering(0)
-
- case (RowFrame, IntegerLiteral(offset)) =>
- RowBoundOrdering(offset)
-
- case (RangeFrame, CurrentRow) =>
- val ordering = newOrdering(orderSpec, child.output)
- RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
-
- case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
- // Use only the first order expression when the offset is non-null.
- val sortExpr = orderSpec.head
- val expr = sortExpr.child
-
- // Create the projection which returns the current 'value'.
- val current = newMutableProjection(expr :: Nil, child.output)
-
- // Flip the sign of the offset when processing the order is descending
- val boundOffset = sortExpr.direction match {
- case Descending => UnaryMinus(offset)
- case Ascending => offset
+ private[this] def createBoundOrdering(frameType: FrameType, offset: Int): BoundOrdering = {
+ frameType match {
+ case RangeFrame =>
+ val (exprs, current, bound) = if (offset == 0) {
+ // Use the entire order expression when the offset is 0.
+ val exprs = orderSpec.map(_.child)
+ val buildProjection = () => newMutableProjection(exprs, child.output)
+ (orderSpec, buildProjection(), buildProjection())
+ } else if (orderSpec.size == 1) {
+ // Use only the first order expression when the offset is non-null.
+ val sortExpr = orderSpec.head
+ val expr = sortExpr.child
+ // Create the projection which returns the current 'value'.
+ val current = newMutableProjection(expr :: Nil, child.output)
+ // Flip the sign of the offset when processing the order is descending
+ val boundOffset = sortExpr.direction match {
+ case Descending => -offset
+ case Ascending => offset
+ }
+ // Create the projection which returns the current 'value' modified by adding the offset.
+ val boundExpr = Add(expr, Cast(Literal.create(boundOffset, IntegerType), expr.dataType))
+ val bound = newMutableProjection(boundExpr :: Nil, child.output)
+ (sortExpr :: Nil, current, bound)
+ } else {
+ sys.error("Non-Zero range offsets are not supported for windows " +
+ "with multiple order expressions.")
}
-
- // Create the projection which returns the current 'value' modified by adding the offset.
- val boundExpr = Add(expr, Cast(boundOffset, expr.dataType))
- val bound = newMutableProjection(boundExpr :: Nil, child.output)
-
// Construct the ordering. This is used to compare the result of current value projection
// to the result of bound value projection. This is done manually because we want to use
// Code Generation (if it is enabled).
- val boundSortExprs = sortExpr.copy(BoundReference(0, expr.dataType, expr.nullable)) :: Nil
- val ordering = newOrdering(boundSortExprs, Nil)
+ val sortExprs = exprs.zipWithIndex.map { case (e, i) =>
+ SortOrder(BoundReference(i, e.dataType, e.nullable), e.direction)
+ }
+ val ordering = newOrdering(sortExprs, Nil)
RangeBoundOrdering(ordering, current, bound)
-
- case (RangeFrame, _) =>
- sys.error("Non-Zero range offsets are not supported for windows " +
- "with multiple order expressions.")
+ case RowFrame => RowBoundOrdering(offset)
}
}
@@ -160,13 +157,13 @@ case class WindowExec(
* WindowExpressions and factory function for the WindowFrameFunction.
*/
private[this] lazy val windowFrameExpressionFactoryPairs = {
- type FrameKey = (String, FrameType, Expression, Expression)
+ type FrameKey = (String, FrameType, Option[Int], Option[Int])
type ExpressionBuffer = mutable.Buffer[Expression]
val framedFunctions = mutable.Map.empty[FrameKey, (ExpressionBuffer, ExpressionBuffer)]
// Add a function and its function to the map for a given frame.
def collect(tpe: String, fr: SpecifiedWindowFrame, e: Expression, fn: Expression): Unit = {
- val key = (tpe, fr.frameType, fr.lower, fr.upper)
+ val key = (tpe, fr.frameType, FrameBoundary(fr.frameStart), FrameBoundary(fr.frameEnd))
val (es, fns) = framedFunctions.getOrElseUpdate(
key, (ArrayBuffer.empty[Expression], ArrayBuffer.empty[Expression]))
es += e
@@ -206,7 +203,7 @@ case class WindowExec(
// Create the factory
val factory = key match {
// Offset Frame
- case ("OFFSET", _, IntegerLiteral(offset), _) =>
+ case ("OFFSET", RowFrame, Some(offset), Some(h)) if offset == h =>
target: InternalRow =>
new OffsetWindowFunctionFrame(
target,
@@ -218,38 +215,38 @@ case class WindowExec(
newMutableProjection(expressions, schema, subexpressionEliminationEnabled),
offset)
- // Entire Partition Frame.
- case ("AGGREGATE", _, UnboundedPreceding, UnboundedFollowing) =>
- target: InternalRow => {
- new UnboundedWindowFunctionFrame(target, processor)
- }
-
// Growing Frame.
- case ("AGGREGATE", frameType, UnboundedPreceding, upper) =>
+ case ("AGGREGATE", frameType, None, Some(high)) =>
target: InternalRow => {
new UnboundedPrecedingWindowFunctionFrame(
target,
processor,
- createBoundOrdering(frameType, upper))
+ createBoundOrdering(frameType, high))
}
// Shrinking Frame.
- case ("AGGREGATE", frameType, lower, UnboundedFollowing) =>
+ case ("AGGREGATE", frameType, Some(low), None) =>
target: InternalRow => {
new UnboundedFollowingWindowFunctionFrame(
target,
processor,
- createBoundOrdering(frameType, lower))
+ createBoundOrdering(frameType, low))
}
// Moving Frame.
- case ("AGGREGATE", frameType, lower, upper) =>
+ case ("AGGREGATE", frameType, Some(low), Some(high)) =>
target: InternalRow => {
new SlidingWindowFunctionFrame(
target,
processor,
- createBoundOrdering(frameType, lower),
- createBoundOrdering(frameType, upper))
+ createBoundOrdering(frameType, low),
+ createBoundOrdering(frameType, high))
+ }
+
+ // Entire Partition Frame.
+ case ("AGGREGATE", frameType, None, None) =>
+ target: InternalRow => {
+ new UnboundedWindowFunctionFrame(target, processor)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index e6cb889..6279d48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.expressions
import org.apache.spark.annotation.InterfaceStability
-import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions._
/**
@@ -123,24 +123,7 @@ class WindowSpec private[sql](
*/
// Note: when updating the doc for this method, also update Window.rowsBetween.
def rowsBetween(start: Long, end: Long): WindowSpec = {
- val boundaryStart = start match {
- case 0 => CurrentRow
- case Long.MinValue => UnboundedPreceding
- case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
- case x => throw new AnalysisException(s"Boundary start is not a valid integer: $x")
- }
-
- val boundaryEnd = end match {
- case 0 => CurrentRow
- case Long.MaxValue => UnboundedFollowing
- case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
- case x => throw new AnalysisException(s"Boundary end is not a valid integer: $x")
- }
-
- new WindowSpec(
- partitionSpec,
- orderSpec,
- SpecifiedWindowFrame(RowFrame, boundaryStart, boundaryEnd))
+ between(RowFrame, start, end)
}
/**
@@ -191,22 +174,28 @@ class WindowSpec private[sql](
*/
// Note: when updating the doc for this method, also update Window.rangeBetween.
def rangeBetween(start: Long, end: Long): WindowSpec = {
+ between(RangeFrame, start, end)
+ }
+
+ private def between(typ: FrameType, start: Long, end: Long): WindowSpec = {
val boundaryStart = start match {
case 0 => CurrentRow
case Long.MinValue => UnboundedPreceding
- case x => Literal(x)
+ case x if x < 0 => ValuePreceding(-start.toInt)
+ case x if x > 0 => ValueFollowing(start.toInt)
}
val boundaryEnd = end match {
case 0 => CurrentRow
case Long.MaxValue => UnboundedFollowing
- case x => Literal(x)
+ case x if x < 0 => ValuePreceding(-end.toInt)
+ case x if x > 0 => ValueFollowing(end.toInt)
}
new WindowSpec(
partitionSpec,
orderSpec,
- SpecifiedWindowFrame(RangeFrame, boundaryStart, boundaryEnd))
+ SpecifiedWindowFrame(typ, boundaryStart, boundaryEnd))
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/resources/sql-tests/inputs/window.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/window.sql b/sql/core/src/test/resources/sql-tests/inputs/window.sql
index 342e571..c800fc3 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/window.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/window.sql
@@ -1,44 +1,24 @@
-- Test data.
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate);
+(null, "a"), (1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"), (null, null), (3, null)
+AS testData(val, cate);
-- RowsBetween
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val ROWS CURRENT ROW) FROM testData
ORDER BY cate, val;
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
-- RangeBetween
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData
ORDER BY cate, val;
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long;
-- RangeBetween with reverse OrderBy
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
--- Invalid window frame
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val;
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val;
-
-
-- Window functions
SELECT val, cate,
max(val) OVER w AS max,
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/resources/sql-tests/results/window.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/window.sql.out b/sql/core/src/test/resources/sql-tests/results/window.sql.out
index 9751106..aa58561 100644
--- a/sql/core/src/test/resources/sql-tests/results/window.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/window.sql.out
@@ -1,12 +1,11 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 19
+-- Number of queries: 11
-- !query 0
CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES
-(null, 1L, "a"), (1, 1L, "a"), (1, 2L, "a"), (2, 2147483650L, "a"), (1, null, "b"), (2, 3L, "b"),
-(3, 2147483650L, "b"), (null, null, null), (3, 1L, null)
-AS testData(val, val_long, cate)
+(null, "a"), (1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"), (null, null), (3, null)
+AS testData(val, cate)
-- !query 0 schema
struct<>
-- !query 0 output
@@ -48,21 +47,11 @@ NULL a 1
-- !query 3
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-ROWS BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
--- !query 3 schema
-struct<>
--- !query 3 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN CURRENT ROW AND 2147483648L FOLLOWING' due to data type mismatch: The data type of the upper bound 'LongType does not match the expected data type 'IntegerType'.; line 1 pos 41
-
-
--- !query 4
SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val RANGE 1 PRECEDING) FROM testData
ORDER BY cate, val
--- !query 4 schema
+-- !query 3 schema
struct<val:int,cate:string,count(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN 1 PRECEDING AND CURRENT ROW):bigint>
--- !query 4 output
+-- !query 3 output
NULL NULL 0
3 NULL 1
NULL a 0
@@ -74,12 +63,12 @@ NULL a 0
3 b 2
--- !query 5
+-- !query 4
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 5 schema
+-- !query 4 schema
struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 5 output
+-- !query 4 output
NULL NULL NULL
3 NULL 3
NULL a NULL
@@ -91,29 +80,12 @@ NULL a NULL
3 b 3
--- !query 6
-SELECT val_long, cate, sum(val_long) OVER(PARTITION BY cate ORDER BY val_long
-RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING) FROM testData ORDER BY cate, val_long
--- !query 6 schema
-struct<val_long:bigint,cate:string,sum(val_long) OVER (PARTITION BY cate ORDER BY val_long ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 2147483648 FOLLOWING):bigint>
--- !query 6 output
-NULL NULL NULL
-1 NULL 1
-1 a 4
-1 a 4
-2 a 2147483652
-2147483650 a 2147483650
-NULL b NULL
-3 b 2147483653
-2147483650 b 2147483650
-
-
--- !query 7
+-- !query 5
SELECT val, cate, sum(val) OVER(PARTITION BY cate ORDER BY val DESC
RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 7 schema
+-- !query 5 schema
struct<val:int,cate:string,sum(val) OVER (PARTITION BY cate ORDER BY val DESC NULLS LAST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING):bigint>
--- !query 7 output
+-- !query 5 output
NULL NULL NULL
3 NULL 3
NULL a NULL
@@ -125,73 +97,7 @@ NULL a NULL
3 b 5
--- !query 8
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 8 schema
-struct<>
--- !query 8 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'ROWS BETWEEN UNBOUNDED FOLLOWING AND 1 FOLLOWING' due to data type mismatch: Window frame upper bound '1' does not followes the lower bound 'unboundedfollowing$()'.; line 1 pos 33
-
-
--- !query 9
-SELECT val, cate, count(val) OVER(PARTITION BY cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 9 schema
-struct<>
--- !query 9 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame cannot be used in an unordered window specification.; line 1 pos 33
-
-
--- !query 10
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val, cate
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 10 schema
-struct<>
--- !query 10 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY testdata.`val` ASC NULLS FIRST, testdata.`cate` ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: A range window frame with value boundaries cannot be used in a window specification with multiple order by expressions: val#x ASC NULLS FIRST,cate#x ASC NULLS FIRST; line 1 pos 33
-
-
--- !query 11
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY current_date
-RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM testData ORDER BY cate, val
--- !query 11 schema
-struct<>
--- !query 11 output
-org.apache.spark.sql.AnalysisException
-cannot resolve '(PARTITION BY testdata.`cate` ORDER BY current_date() ASC NULLS FIRST RANGE BETWEEN CURRENT ROW AND 1 FOLLOWING)' due to data type mismatch: The data type 'DateType' used in the order specification does not match the data type 'IntegerType' which is used in the range frame.; line 1 pos 33
-
-
--- !query 12
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM testData ORDER BY cate, val
--- !query 12 schema
-struct<>
--- !query 12 output
-org.apache.spark.sql.AnalysisException
-cannot resolve 'RANGE BETWEEN 1 FOLLOWING AND 1 PRECEDING' due to data type mismatch: The lower bound of a window frame must be less than or equal to the upper bound; line 1 pos 33
-
-
--- !query 13
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
--- !query 13 schema
-struct<>
--- !query 13 output
-org.apache.spark.sql.catalyst.parser.ParseException
-
-Frame bound value must be a literal.(line 2, pos 30)
-
-== SQL ==
-SELECT val, cate, count(val) OVER(PARTITION BY cate ORDER BY val
-RANGE BETWEEN CURRENT ROW AND current_date PRECEDING) FROM testData ORDER BY cate, val
-------------------------------^^^
-
-
--- !query 14
+-- !query 6
SELECT val, cate,
max(val) OVER w AS max,
min(val) OVER w AS min,
@@ -218,9 +124,9 @@ approx_count_distinct(val) OVER w AS approx_count_distinct
FROM testData
WINDOW w AS (PARTITION BY cate ORDER BY val)
ORDER BY cate, val
--- !query 14 schema
+-- !query 6 schema
struct<val:int,cate:string,max:int,min:int,min:int,count:bigint,sum:bigint,avg:double,stddev:double,first_value:int,first_value_ignore_null:int,first_value_contain_null:int,last_value:int,last_value_ignore_null:int,last_value_contain_null:int,rank:int,dense_rank:int,cume_dist:double,percent_rank:double,ntile:int,row_number:int,var_pop:double,var_samp:double,approx_count_distinct:bigint>
--- !query 14 output
+-- !query 6 output
NULL NULL NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.5 0.0 1 1 NULL NULL 0
3 NULL 3 3 3 1 3 3.0 NaN NULL 3 NULL 3 3 3 2 2 1.0 1.0 2 2 0.0 NaN 1
NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0.0 1 1 NULL NULL 0
@@ -232,11 +138,11 @@ NULL a NULL NULL NULL 0 NULL NULL NULL NULL NULL NULL NULL NULL NULL 1 1 0.25 0.
3 b 3 1 1 3 6 2.0 1.0 1 1 1 3 3 3 3 3 1.0 1.0 2 3 0.6666666666666666 1.0 3
--- !query 15
+-- !query 7
SELECT val, cate, avg(null) OVER(PARTITION BY cate ORDER BY val) FROM testData ORDER BY cate, val
--- !query 15 schema
+-- !query 7 schema
struct<val:int,cate:string,avg(CAST(NULL AS DOUBLE)) OVER (PARTITION BY cate ORDER BY val ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):double>
--- !query 15 output
+-- !query 7 output
NULL NULL NULL
3 NULL NULL
NULL a NULL
@@ -248,20 +154,20 @@ NULL a NULL
3 b NULL
--- !query 16
+-- !query 8
SELECT val, cate, row_number() OVER(PARTITION BY cate) FROM testData ORDER BY cate, val
--- !query 16 schema
+-- !query 8 schema
struct<>
--- !query 16 output
+-- !query 8 output
org.apache.spark.sql.AnalysisException
Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;
--- !query 17
+-- !query 9
SELECT val, cate, sum(val) OVER(), avg(val) OVER() FROM testData ORDER BY cate, val
--- !query 17 schema
+-- !query 9 schema
struct<val:int,cate:string,sum(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):bigint,avg(CAST(val AS BIGINT)) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING):double>
--- !query 17 output
+-- !query 9 output
NULL NULL 13 1.8571428571428572
3 NULL 13 1.8571428571428572
NULL a 13 1.8571428571428572
@@ -273,7 +179,7 @@ NULL a 13 1.8571428571428572
3 b 13 1.8571428571428572
--- !query 18
+-- !query 10
SELECT val, cate,
first_value(false) OVER w AS first_value,
first_value(true, true) OVER w AS first_value_ignore_null,
@@ -284,9 +190,9 @@ last_value(false, false) OVER w AS last_value_contain_null
FROM testData
WINDOW w AS ()
ORDER BY cate, val
--- !query 18 schema
+-- !query 10 schema
struct<val:int,cate:string,first_value:boolean,first_value_ignore_null:boolean,first_value_contain_null:boolean,last_value:boolean,last_value_ignore_null:boolean,last_value_contain_null:boolean>
--- !query 18 output
+-- !query 10 output
NULL NULL false true false false true false
3 NULL false true false false true false
NULL a false true false false true false
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9806e57..204858f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -151,48 +151,6 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
Row(2.0d), Row(2.0d)))
}
- test("row between should accept integer values as boundary") {
- val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
- (3L, "2"), (2L, "1"), (2147483650L, "2"))
- .toDF("key", "value")
- df.createOrReplaceTempView("window_table")
- checkAnswer(
- df.select(
- $"key",
- count("key").over(
- Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 2147483647))),
- Seq(Row(1, 3), Row(1, 4), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))
- )
-
- val e = intercept[AnalysisException](
- df.select(
- $"key",
- count("key").over(
- Window.partitionBy($"value").orderBy($"key").rowsBetween(0, 2147483648L))))
- assert(e.message.contains("Boundary end is not a valid integer: 2147483648"))
- }
-
- test("range between should accept integer/long values as boundary") {
- val df = Seq((1L, "1"), (1L, "1"), (2147483650L, "1"),
- (3L, "2"), (2L, "1"), (2147483650L, "2"))
- .toDF("key", "value")
- df.createOrReplaceTempView("window_table")
- checkAnswer(
- df.select(
- $"key",
- count("key").over(
- Window.partitionBy($"value").orderBy($"key").rangeBetween(0, 2147483648L))),
- Seq(Row(1, 3), Row(1, 3), Row(2, 2), Row(3, 2), Row(2147483650L, 1), Row(2147483650L, 1))
- )
- checkAnswer(
- df.select(
- $"key",
- count("key").over(
- Window.partitionBy($"value").orderBy($"key").rangeBetween(-2147483649L, 0))),
- Seq(Row(1, 2), Row(1, 2), Row(2, 3), Row(2147483650L, 2), Row(2147483650L, 4), Row(3, 1))
- )
- }
-
test("aggregation and rows between with unbounded") {
val df = Seq((1, "1"), (2, "2"), (2, "3"), (1, "3"), (3, "2"), (4, "3")).toDF("key", "value")
df.createOrReplaceTempView("window_table")
http://git-wip-us.apache.org/repos/asf/spark/blob/e2062b9c/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
index 90f9059..149ce1e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/ExpressionSQLBuilderSuite.scala
@@ -98,27 +98,27 @@ class ExpressionSQLBuilderSuite extends SQLBuilderTest {
checkSQL(
WindowSpecDefinition('a.int :: Nil, Nil, frame),
- s"(PARTITION BY `a` ${frame.sql})"
+ s"(PARTITION BY `a` $frame)"
)
checkSQL(
WindowSpecDefinition('a.int :: 'b.string :: Nil, Nil, frame),
- s"(PARTITION BY `a`, `b` ${frame.sql})"
+ s"(PARTITION BY `a`, `b` $frame)"
)
checkSQL(
WindowSpecDefinition(Nil, 'a.int.asc :: Nil, frame),
- s"(ORDER BY `a` ASC NULLS FIRST ${frame.sql})"
+ s"(ORDER BY `a` ASC NULLS FIRST $frame)"
)
checkSQL(
WindowSpecDefinition(Nil, 'a.int.asc :: 'b.string.desc :: Nil, frame),
- s"(ORDER BY `a` ASC NULLS FIRST, `b` DESC NULLS LAST ${frame.sql})"
+ s"(ORDER BY `a` ASC NULLS FIRST, `b` DESC NULLS LAST $frame)"
)
checkSQL(
WindowSpecDefinition('a.int :: 'b.string :: Nil, 'c.int.asc :: 'd.string.desc :: Nil, frame),
- s"(PARTITION BY `a`, `b` ORDER BY `c` ASC NULLS FIRST, `d` DESC NULLS LAST ${frame.sql})"
+ s"(PARTITION BY `a`, `b` ORDER BY `c` ASC NULLS FIRST, `d` DESC NULLS LAST $frame)"
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org