You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/15 14:57:05 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #29604: [SPARK-27951][SQL] Support ANSI SQL NTH_VALUE window function

cloud-fan commented on a change in pull request #29604:
URL: https://github.com/apache/spark/pull/29604#discussion_r488719491



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the

Review comment:
       nit: we can remove it as it just repeats the usage doc.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in

Review comment:
       `offset - a positive int literal to indicate the offset in the window frame. It starts with 1.`

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)

Review comment:
       BTW, which window frames does `OffsetWindowFunction` support?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       and `count` should start from 0, so that we at least update `result` once.

##########
File path: sql/core/src/test/resources/sql-tests/inputs/window.sql
##########
@@ -124,4 +144,26 @@ WINDOW w AS (PARTITION BY cate ORDER BY val);
 -- with filter predicate
 SELECT val, cate,
 count(val) FILTER (WHERE val > 1) OVER(PARTITION BY cate)
-FROM testData ORDER BY cate, val;
\ No newline at end of file
+FROM testData ORDER BY cate, val;
+
+-- nth_value() over ()
+SELECT
+    employee_name,
+    salary,
+    nth_value(employee_name, 2) OVER (ORDER BY salary DESC) second_highest_salary
+FROM
+    basic_pays
+ORDER BY salary DESC;
+
+SELECT
+	employee_name,
+	department,
+	salary,
+	NTH_VALUE(employee_name, 2) OVER  (
+		PARTITION BY department
+		ORDER BY salary DESC
+		RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

Review comment:
       can we test more different frame boundaries?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null

Review comment:
       This doesn't match the usage doc: ```If the value of `input` at the `offset`th row is null, null is returned```

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)

Review comment:
       I don't think it worths an extra boolean slot just to save the calculation of `count >= offset`.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)

Review comment:
       can we add a TODO to optimize it using `OffsetWindowFunction`?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
##########
@@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction {
   override def prettyName: String = "cume_dist"
 }
 
+/**
+ * The NthValue function returns the value of `input` at the `offset`th row from beginning of the
+ * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or
+ * there is no such an `offset`th row, null is returned.
+ */
+@ExpressionDescription(
+  usage = """
+    _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row
+      from beginning of the window frame. Offsets start at 1. If the value of `input` at the
+      `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the
+      offset is 10, size of the window frame less than 10), null is returned.
+  """,
+  arguments = """
+    Arguments:
+      * input - the target column or expression that the function operates on.
+      * offset - an int expression which determines the row number relative to the first row in
+          the window for which to return the expression. The offset can be a constant or an
+          expression and must be a positive integer that is greater than 0.
+      * ignoreNulls - an optional specification that indicates the NthValue should skip null
+          values in the determination of which row to use.
+  """,
+  since = "3.1.0",
+  group = "window_funcs")
+case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean)
+    extends AggregateWindowFunction with ImplicitCastInputTypes {
+
+  def this(child: Expression, offset: Expression) = this(child, offset, false)
+
+  override def children: Seq[Expression] = input :: Nil
+
+  override def frame: WindowFrame = UnspecifiedFrame
+
+  override def dataType: DataType = input.dataType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val check = super.checkInputDataTypes()
+    if (check.isFailure) {
+      check
+    } else if (!offsetExpr.foldable) {
+      TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.")
+    } else {
+      offsetExpr.dataType match {
+        case IntegerType | ShortType | ByteType =>
+          offsetExpr.eval().asInstanceOf[Int] match {
+            case i: Int if i <= 0 => TypeCheckFailure(
+              s"The 'offset' argument of nth_value must be greater than zero but it is $i.")
+            case _ => TypeCheckSuccess
+          }
+        case _ => TypeCheckFailure(
+          s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.")
+      }
+    }
+  }
+
+  private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong
+  private lazy val result = AttributeReference("result", input.dataType)()
+  private lazy val count = AttributeReference("count", LongType)()
+  private lazy val valueSet = AttributeReference("valueSet", BooleanType)()
+  override lazy val aggBufferAttributes: Seq[AttributeReference] =
+    result :: count :: valueSet :: Nil
+
+  override lazy val initialValues: Seq[Literal] = Seq(
+    /* result = */ Literal.create(null, input.dataType),
+    /* count = */ Literal(1L),
+    /* valueSet = */ Literal.create(false, BooleanType)
+  )
+
+  override lazy val updateExpressions: Seq[Expression] = {
+    if (ignoreNulls) {
+      Seq(
+        /* result = */ If(valueSet || input.isNull || count < offset, result, input),
+        /* count = */ If(input.isNull, count, count + 1L),
+        /* valueSet = */ valueSet || (input.isNotNull && count >= offset)
+      )
+    } else {
+      Seq(
+        /* result = */ If(valueSet || count < offset, result, input),
+        /* count = */ count + 1L,
+        /* valueSet = */ valueSet || count >= offset

Review comment:
       I'd expect something like
   ```
   Seq(
     /* result = */ If(count < offset, input, result),
     /* count = */ count + 1L
   )
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org