You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/22 07:19:21 UTC
[spark] branch master updated: [SPARK-42520][CONNECT] Support basic Window API in Scala client
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 054522b6762 [SPARK-42520][CONNECT] Support basic Window API in Scala client
054522b6762 is described below
commit 054522b67626aa1515b8f3f164ba7c063c38e5b8
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Wed Feb 22 15:19:00 2023 +0800
[SPARK-42520][CONNECT] Support basic Window API in Scala client
### What changes were proposed in this pull request?
Support Window orderby, partitionby, rowsbetween/rangebetween.
### Why are the changes needed?
API coverage
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
UT
Closes #40107 from amaliujia/rw-window-2.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Column.scala | 33 +++
.../org/apache/spark/sql/expressions/Window.scala | 241 +++++++++++++++++++++
.../apache/spark/sql/expressions/WindowSpec.scala | 240 ++++++++++++++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 11 +
.../query-tests/explain-results/window.explain | 8 +
.../test/resources/query-tests/queries/window.json | 205 ++++++++++++++++++
.../resources/query-tests/queries/window.proto.bin | 43 ++++
7 files changed, 781 insertions(+)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index c3e1113aa45..fde17963bfd 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -24,6 +24,7 @@ import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{DataType, Metadata}
@@ -1233,6 +1234,38 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg
* @since 3.4.0
*/
def bitwiseXOR(other: Any): Column = fn("^", other)
+
+ /**
+ * Defines a windowing column.
+ *
+ * {{{
+ * val w = Window.partitionBy("name").orderBy("id")
+ * df.select(
+ * sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
+ * avg("price").over(w.rowsBetween(Window.currentRow, 4))
+ * )
+ * }}}
+ *
+ * @group expr_ops
+ * @since 3.4.0
+ */
+ def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
+
+ /**
+ * Defines an empty analytic clause. In this case the analytic function is applied and presented
+ * for all rows in the result set.
+ *
+ * {{{
+ * df.select(
+ * sum("price").over(),
+ * avg("price").over()
+ * )
+ * }}}
+ *
+ * @group expr_ops
+ * @since 3.4.0
+ */
+ def over(): Column = over(Window.spec)
}
private[sql] object Column {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala
new file mode 100644
index 00000000000..c85e7bc9c5c
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.expressions
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.Column
+
+/**
+ * Utility functions for defining window in DataFrames.
+ *
+ * {{{
+ * // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ * Window.partitionBy("country").orderBy("date")
+ * .rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ *
+ * // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
+ * Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
+ * }}}
+ *
+ * @note
+ * When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding,
+ * unboundedFollowing) is used by default. When ordering is defined, a growing window frame
+ * (rangeFrame, unboundedPreceding, currentRow) is used by default.
+ *
+ * @since 3.4.0
+ */
+@Stable
+object Window {
+
+ /**
+ * Creates a [[WindowSpec]] with the partitioning defined.
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colName: String, colNames: String*): WindowSpec = {
+ spec.partitionBy(colName, colNames: _*)
+ }
+
+ /**
+ * Creates a [[WindowSpec]] with the partitioning defined.
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(cols: Column*): WindowSpec = {
+ spec.partitionBy(cols: _*)
+ }
+
+ /**
+ * Creates a [[WindowSpec]] with the ordering defined.
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(colName: String, colNames: String*): WindowSpec = {
+ spec.orderBy(colName, colNames: _*)
+ }
+
+ /**
+ * Creates a [[WindowSpec]] with the ordering defined.
+ * @since 1.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(cols: Column*): WindowSpec = {
+ spec.orderBy(cols: _*)
+ }
+
+ /**
+ * Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING" in
+ * SQL. This can be used to specify the frame boundaries:
+ *
+ * {{{
+ * Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def unboundedPreceding: Long = Long.MinValue
+
+ /**
+ * Value representing the last row in the partition, equivalent to "UNBOUNDED FOLLOWING" in SQL.
+ * This can be used to specify the frame boundaries:
+ *
+ * {{{
+ * Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def unboundedFollowing: Long = Long.MaxValue
+
+ /**
+ * Value representing the current row. This can be used to specify the frame boundaries:
+ *
+ * {{{
+ * Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ * }}}
+ *
+ * @since 3.4.0
+ */
+ def currentRow: Long = 0
+
+ /**
+ * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end`
+ * (inclusive).
+ *
+ * Both `start` and `end` are positions relative to the current row. For example, "0" means
+ * "current row", while "-1" means the row before the current row, and "5" means the fifth row
+ * after the current row.
+ *
+ * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+ * `Window.currentRow` to specify special boundary values, rather than using integral values
+ * directly.
+ *
+ * A row based boundary is based on the position of the row within the partition. An offset
+ * indicates the number of rows above or below the current row, the frame for the current row
+ * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1
+ * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to
+ * index 7.
+ *
+ * {{{
+ * import org.apache.spark.sql.expressions.Window
+ * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+ * .toDF("id", "category")
+ * val byCategoryOrderedById =
+ * Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1)
+ * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+ *
+ * +---+--------+---+
+ * | id|category|sum|
+ * +---+--------+---+
+ * | 1| b| 3|
+ * | 2| b| 5|
+ * | 3| b| 3|
+ * | 1| a| 2|
+ * | 1| a| 3|
+ * | 2| a| 2|
+ * +---+--------+---+
+ * }}}
+ *
+ * @param start
+ * boundary start, inclusive. The frame is unbounded if this is the minimum long value
+ * (`Window.unboundedPreceding`).
+ * @param end
+ * boundary end, inclusive. The frame is unbounded if this is the maximum long value
+ * (`Window.unboundedFollowing`).
+ * @since 3.4.0
+ */
+ // Note: when updating the doc for this method, also update WindowSpec.rowsBetween.
+ def rowsBetween(start: Long, end: Long): WindowSpec = {
+ spec.rowsBetween(start, end)
+ }
+
+ /**
+ * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end`
+ * (inclusive).
+ *
+ * Both `start` and `end` are relative to the current row. For example, "0" means "current row",
+ * while "-1" means one off before the current row, and "5" means the five off after the current
+ * row.
+ *
+ * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+ * `Window.currentRow` to specify special boundary values, rather than using long values
+ * directly.
+ *
+ * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset
+ * is used to alter the value of the ORDER BY expression, for instance if the current ORDER BY
+ * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for
+ * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY
+ * expressions: there can be only one expression and this expression must have a numerical data
+ * type. An exception can be made when the offset is unbounded, because no value modification is
+ * needed, in this case multiple and non-numeric ORDER BY expression are allowed.
+ *
+ * {{{
+ * import org.apache.spark.sql.expressions.Window
+ * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+ * .toDF("id", "category")
+ * val byCategoryOrderedById =
+ * Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1)
+ * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+ *
+ * +---+--------+---+
+ * | id|category|sum|
+ * +---+--------+---+
+ * | 1| b| 3|
+ * | 2| b| 5|
+ * | 3| b| 3|
+ * | 1| a| 4|
+ * | 1| a| 4|
+ * | 2| a| 2|
+ * +---+--------+---+
+ * }}}
+ *
+ * @param start
+ * boundary start, inclusive. The frame is unbounded if this is the minimum long value
+ * (`Window.unboundedPreceding`).
+ * @param end
+ * boundary end, inclusive. The frame is unbounded if this is the maximum long value
+ * (`Window.unboundedFollowing`).
+ * @since 3.4.0
+ */
+ // Note: when updating the doc for this method, also update WindowSpec.rangeBetween.
+ def rangeBetween(start: Long, end: Long): WindowSpec = {
+ spec.rangeBetween(start, end)
+ }
+
+ private[sql] def spec: WindowSpec = {
+ new WindowSpec(Seq.empty, Seq.empty, None)
+ }
+
+}
+
+/**
+ * Utility functions for defining window in DataFrames.
+ *
+ * {{{
+ * // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ * Window.partitionBy("country").orderBy("date")
+ * .rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ *
+ * // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
+ * Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
+ * }}}
+ *
+ * @since 3.4.0
+ */
+@Stable
+class Window private () // So we can see Window in JavaDoc.
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
new file mode 100644
index 00000000000..cecfb6a0d91
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.expressions
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.Column
+
+/**
+ * A window specification that defines the partitioning, ordering, and frame boundaries.
+ *
+ * Use the static methods in [[Window]] to create a [[WindowSpec]].
+ *
+ * @since 3.4.0
+ */
+@Stable
+class WindowSpec private[sql] (
+ partitionSpec: Seq[proto.Expression],
+ orderSpec: Seq[proto.Expression.SortOrder],
+ frame: Option[proto.Expression.Window.WindowFrame]) {
+
+ /**
+ * Defines the partitioning columns in a [[WindowSpec]].
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(colName: String, colNames: String*): WindowSpec = {
+ partitionBy((colName +: colNames).map(Column(_)): _*)
+ }
+
+ /**
+ * Defines the partitioning columns in a [[WindowSpec]].
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def partitionBy(cols: Column*): WindowSpec = {
+ new WindowSpec(cols.map(_.expr), orderSpec, frame)
+ }
+
+ /**
+ * Defines the ordering columns in a [[WindowSpec]].
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(colName: String, colNames: String*): WindowSpec = {
+ orderBy((colName +: colNames).map(Column(_)): _*)
+ }
+
+ /**
+ * Defines the ordering columns in a [[WindowSpec]].
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(cols: Column*): WindowSpec = {
+ val sortOrder: Seq[proto.Expression.SortOrder] = cols.map(_.sortOrder)
+ new WindowSpec(partitionSpec, sortOrder, frame)
+ }
+
+ /**
+ * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+ *
+ * Both `start` and `end` are relative positions from the current row. For example, "0" means
+ * "current row", while "-1" means the row before the current row, and "5" means the fifth row
+ * after the current row.
+ *
+ * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+ * `Window.currentRow` to specify special boundary values, rather than using integral values
+ * directly.
+ *
+ * A row based boundary is based on the position of the row within the partition. An offset
+ * indicates the number of rows above or below the current row, the frame for the current row
+ * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1
+ * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to
+ * index 7.
+ *
+ * {{{
+ * import org.apache.spark.sql.expressions.Window
+ * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+ * .toDF("id", "category")
+ * val byCategoryOrderedById =
+ * Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1)
+ * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+ *
+ * +---+--------+---+
+ * | id|category|sum|
+ * +---+--------+---+
+ * | 1| b| 3|
+ * | 2| b| 5|
+ * | 3| b| 3|
+ * | 1| a| 2|
+ * | 1| a| 3|
+ * | 2| a| 2|
+ * +---+--------+---+
+ * }}}
+ *
+ * @param start
+ * boundary start, inclusive. The frame is unbounded if this is the minimum long value
+ * (`Window.unboundedPreceding`).
+ * @param end
+ * boundary end, inclusive. The frame is unbounded if this is the maximum long value
+ * (`Window.unboundedFollowing`).
+ * @since 3.4.0
+ */
+ // Note: when updating the doc for this method, also update Window.rowsBetween.
+ def rowsBetween(start: Long, end: Long): WindowSpec = {
+ new WindowSpec(
+ partitionSpec,
+ orderSpec,
+ Some(
+ toWindowFrame(
+ proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_ROW,
+ start,
+ end,
+ true)))
+ }
+
+ /**
+ * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+ *
+ * Both `start` and `end` are relative from the current row. For example, "0" means "current
+ * row", while "-1" means one off before the current row, and "5" means the five off after the
+ * current row.
+ *
+ * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+ * `Window.currentRow` to specify special boundary values, rather than using long values
+ * directly.
+ *
+ * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset
+ * is used to alter the value of the ORDER BY expression, for instance if the current order by
+ * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for
+ * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY
+ * expressions: there can be only one expression and this expression must have a numerical data
+ * type. An exception can be made when the offset is unbounded, because no value modification is
+ * needed, in this case multiple and non-numeric ORDER BY expression are allowed.
+ *
+ * {{{
+ * import org.apache.spark.sql.expressions.Window
+ * val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+ * .toDF("id", "category")
+ * val byCategoryOrderedById =
+ * Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1)
+ * df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+ *
+ * +---+--------+---+
+ * | id|category|sum|
+ * +---+--------+---+
+ * | 1| b| 3|
+ * | 2| b| 5|
+ * | 3| b| 3|
+ * | 1| a| 4|
+ * | 1| a| 4|
+ * | 2| a| 2|
+ * +---+--------+---+
+ * }}}
+ *
+ * @param start
+ * boundary start, inclusive. The frame is unbounded if this is the minimum long value
+ * (`Window.unboundedPreceding`).
+ * @param end
+ * boundary end, inclusive. The frame is unbounded if this is the maximum long value
+ * (`Window.unboundedFollowing`).
+ * @since 3.4.0
+ */
+ // Note: when updating the doc for this method, also update Window.rangeBetween.
+ def rangeBetween(start: Long, end: Long): WindowSpec = {
+ new WindowSpec(
+ partitionSpec,
+ orderSpec,
+ Some(
+ toWindowFrame(
+ proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_RANGE,
+ start,
+ end,
+ false)))
+ }
+
+ /**
+ * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
+ */
+ private[sql] def withAggregate(aggregate: Column): Column = {
+ Column { builder =>
+ val windowBuilder = builder.getWindowBuilder
+ windowBuilder.setWindowFunction(aggregate.expr)
+ if (frame.isDefined) {
+ windowBuilder.setFrameSpec(frame.get)
+ }
+ windowBuilder.addAllPartitionSpec(partitionSpec.asJava)
+ windowBuilder.addAllOrderSpec(orderSpec.asJava)
+ }
+ }
+
+ private[sql] def toWindowFrame(
+ frameType: proto.Expression.Window.WindowFrame.FrameType,
+ start: Long,
+ end: Long,
+ isRowBetween: Boolean): proto.Expression.Window.WindowFrame = {
+ val windowFrameBuilder = proto.Expression.Window.WindowFrame.newBuilder()
+ windowFrameBuilder.setFrameType(frameType)
+ start match {
+ case 0 => windowFrameBuilder.getLowerBuilder.setCurrentRow(true)
+ case Long.MinValue => windowFrameBuilder.getLowerBuilder.setUnbounded(true)
+ case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue =>
+ windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder
+ .setInteger(start.toInt)
+ case _ if !isRowBetween =>
+ windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder.setLong(start)
+ case _ => throw new UnsupportedOperationException()
+ }
+
+ end match {
+ case 0 => windowFrameBuilder.getUpperBuilder.setCurrentRow(true)
+ case Long.MaxValue => windowFrameBuilder.getUpperBuilder.setUnbounded(true)
+ case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue =>
+ windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder
+ .setInteger(end.toInt)
+ case _ if !isRowBetween =>
+ windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder.setLong(end)
+ case _ => throw new UnsupportedOperationException()
+ }
+
+ windowFrameBuilder.build()
+ }
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b9ae66b2b9e..4cd7bfa0887 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
@@ -1645,4 +1646,14 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
}
/* Window API */
+ test("window") {
+ simple.select(
+ fn.min("id").over(Window.partitionBy(Column("a"), Column("b"))),
+ fn.min("id").over(Window.partitionBy("a", "b")),
+ fn.min("id").over(Window.orderBy(Column("a"), Column("b"))),
+ fn.min("id").over(Window.orderBy("a", "b")),
+ fn.min("id").over(Window.orderBy("a").rowsBetween(2L, 3L)),
+ fn.min("id").over(Window.orderBy("a").rangeBetween(2L, 3L)),
+ fn.count(Column("id")).over())
+ }
}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain
new file mode 100644
index 00000000000..6f816957925
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain
@@ -0,0 +1,8 @@
+Project [min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING [...]
++- Project [id#0L, a#0, b#0, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS [...]
+ +- Window [count(id#0L) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L]
+ +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 3)) AS min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, cast(2 as int), cast(3 as int))) AS min(id) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L], [a#0 ASC NULLS FIRST]
+ +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST [...]
+ +- Window [min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L], [a#0, b#0]
+ +- Project [id#0L, a#0, b#0]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.json b/connector/connect/common/src/test/resources/query-tests/queries/window.json
new file mode 100644
index 00000000000..8649a9d6e54
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/window.json
@@ -0,0 +1,205 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "frameSpec": {
+ "frameType": "FRAME_TYPE_ROW",
+ "lower": {
+ "value": {
+ "literal": {
+ "integer": 2
+ }
+ }
+ },
+ "upper": {
+ "value": {
+ "literal": {
+ "integer": 3
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "min",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "frameSpec": {
+ "frameType": "FRAME_TYPE_RANGE",
+ "lower": {
+ "value": {
+ "literal": {
+ "long": "2"
+ }
+ }
+ },
+ "upper": {
+ "value": {
+ "literal": {
+ "long": "3"
+ }
+ }
+ }
+ }
+ }
+ }, {
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "count",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ }
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin
new file mode 100644
index 00000000000..caa9d66934c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin
@@ -0,0 +1,43 @@
+�
+$Z" struct<id:bigint,a:int,b:double>!Z
+
+min
+id
+a
+b!Z
+
+min
+id
+a
+b-Z+
+
+min
+id
+
+a
+
+b-Z+
+
+min
+id
+
+a
+
+b4Z2
+
+min
+id
+
+a"
+0
+04Z2
+
+min
+id
+
+a"
+8
+8Z
+
+count
+id
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org