You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/22 14:54:11 UTC
[spark] branch branch-3.4 updated: [SPARK-42527][CONNECT] Scala Client add Window functions
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 921a633c86c [SPARK-42527][CONNECT] Scala Client add Window functions
921a633c86c is described below
commit 921a633c86ce57f4498f5f355af37f80db832298
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Feb 22 10:53:15 2023 -0400
[SPARK-42527][CONNECT] Scala Client add Window functions
### What changes were proposed in this pull request?
This PR aims add the window functions to the Scala spark connect client.
### Why are the changes needed?
Provide same APIs in the Scala spark connect client as in the original Dataset API.
### Does this PR introduce _any_ user-facing change?
Yes, it adds new for functions to the Spark Connect Scala client.
### How was this patch tested?
- Add new test
- Manual checked connect-client-jvm and connect with Scala-2.13
Closes #40120 from LuciferYang/window-functions.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit e2f65b5316ed1473518e2d79e89c9bed756029e9)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../scala/org/apache/spark/sql/functions.scala | 256 ++++++++++++++++++++-
.../org/apache/spark/sql/FunctionTestSuite.scala | 14 ++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 39 ++++
.../explain-results/function_cume_dist.explain | 5 +
.../explain-results/function_dense_rank.explain | 5 +
.../explain-results/function_lag.explain | 5 +
.../explain-results/function_lead.explain | 5 +
.../explain-results/function_nth_value.explain | 5 +
.../explain-results/function_ntile.explain | 5 +
.../explain-results/function_percent_rank.explain | 5 +
.../explain-results/function_rank.explain | 5 +
.../explain-results/function_row_number.explain | 5 +
.../query-tests/queries/function_cume_dist.json | 32 +++
.../queries/function_cume_dist.proto.bin | 7 +
.../query-tests/queries/function_dense_rank.json | 32 +++
.../queries/function_dense_rank.proto.bin | 8 +
.../query-tests/queries/function_lag.json | 52 +++++
.../query-tests/queries/function_lag.proto.bin | Bin 0 -> 209 bytes
.../query-tests/queries/function_lead.json | 49 ++++
.../query-tests/queries/function_lead.proto.bin | 11 +
.../query-tests/queries/function_nth_value.json | 45 ++++
.../queries/function_nth_value.proto.bin | 10 +
.../query-tests/queries/function_ntile.json | 37 +++
.../query-tests/queries/function_ntile.proto.bin | 8 +
.../query-tests/queries/function_percent_rank.json | 32 +++
.../queries/function_percent_rank.proto.bin | 7 +
.../query-tests/queries/function_rank.json | 32 +++
.../query-tests/queries/function_rank.proto.bin | 7 +
.../query-tests/queries/function_row_number.json | 32 +++
.../queries/function_row_number.proto.bin | 8 +
.../sql/connect/planner/SparkConnectPlanner.scala | 20 ++
31 files changed, 782 insertions(+), 1 deletion(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 4996b5033e3..0fd35b570f8 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -100,6 +100,9 @@ object functions {
.setValue(value)
}
+ private val nullType =
+ proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build()
+
/**
* Creates a [[Column]] of literal value.
*
@@ -129,7 +132,7 @@ object functions {
case v: Array[Byte] => createLiteral(_.setBinary(ByteString.copyFrom(v)))
case v: collection.mutable.WrappedArray[_] => lit(v.array)
case v: LocalDate => createLiteral(_.setDate(v.toEpochDay.toInt))
- case null => unsupported("Null literals not supported yet.")
+ case null => createLiteral(_.setNull(nullType))
case _ => unsupported(s"literal $literal not supported (yet).")
}
}
@@ -895,6 +898,257 @@ object functions {
*/
def var_pop(columnName: String): Column = var_pop(Column(columnName))
+ //////////////////////////////////////////////////////////////////////////////////////////////
+ // Window functions
+ //////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Window function: returns the cumulative distribution of values within a window partition,
+ * i.e. the fraction of rows that are below the current row.
+ *
+ * {{{
+ * N = total number of rows in the partition
+ * cumeDist(x) = number of values before (and including) x / N
+ * }}}
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def cume_dist(): Column = Column.fn("cume_dist")
+
+ /**
+ * Window function: returns the rank of rows within a window partition, without any gaps.
+ *
+ * The difference between rank and dense_rank is that denseRank leaves no gaps in ranking
+ * sequence when there are ties. That is, if you were ranking a competition using dense_rank and
+ * had three people tie for second place, you would say that all three were in second place and
+ * that the next person came in third. Rank would give me sequential numbers, making the person
+ * that came in third place (after the ties) would register as coming in fifth.
+ *
+ * This is equivalent to the DENSE_RANK function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def dense_rank(): Column = Column.fn("dense_rank")
+
+ /**
+ * Window function: returns the value that is `offset` rows before the current row, and `null`
+ * if there is less than `offset` rows before the current row. For example, an `offset` of one
+ * will return the previous row at any given point in the window partition.
+ *
+ * This is equivalent to the LAG function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lag(e: Column, offset: Int): Column = lag(e, offset, null)
+
+ /**
+ * Window function: returns the value that is `offset` rows before the current row, and `null`
+ * if there is less than `offset` rows before the current row. For example, an `offset` of one
+ * will return the previous row at any given point in the window partition.
+ *
+ * This is equivalent to the LAG function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lag(columnName: String, offset: Int): Column = lag(columnName, offset, null)
+
+ /**
+ * Window function: returns the value that is `offset` rows before the current row, and
+ * `defaultValue` if there is less than `offset` rows before the current row. For example, an
+ * `offset` of one will return the previous row at any given point in the window partition.
+ *
+ * This is equivalent to the LAG function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lag(columnName: String, offset: Int, defaultValue: Any): Column = {
+ lag(Column(columnName), offset, defaultValue)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows before the current row, and
+ * `defaultValue` if there is less than `offset` rows before the current row. For example, an
+ * `offset` of one will return the previous row at any given point in the window partition.
+ *
+ * This is equivalent to the LAG function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+ lag(e, offset, defaultValue, false)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows before the current row, and
+ * `defaultValue` if there is less than `offset` rows before the current row. `ignoreNulls`
+ * determines whether null values of row are included in or eliminated from the calculation. For
+ * example, an `offset` of one will return the previous row at any given point in the window
+ * partition.
+ *
+ * This is equivalent to the LAG function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lag(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column =
+ Column.fn("lag", e, lit(offset), lit(defaultValue), lit(ignoreNulls))
+
+ /**
+ * Window function: returns the value that is `offset` rows after the current row, and `null` if
+ * there is less than `offset` rows after the current row. For example, an `offset` of one will
+ * return the next row at any given point in the window partition.
+ *
+ * This is equivalent to the LEAD function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lead(columnName: String, offset: Int): Column = {
+ lead(columnName, offset, null)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows after the current row, and `null` if
+ * there is less than `offset` rows after the current row. For example, an `offset` of one will
+ * return the next row at any given point in the window partition.
+ *
+ * This is equivalent to the LEAD function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lead(e: Column, offset: Int): Column = {
+ lead(e, offset, null)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows after the current row, and
+ * `defaultValue` if there is less than `offset` rows after the current row. For example, an
+ * `offset` of one will return the next row at any given point in the window partition.
+ *
+ * This is equivalent to the LEAD function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lead(columnName: String, offset: Int, defaultValue: Any): Column = {
+ lead(Column(columnName), offset, defaultValue)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows after the current row, and
+ * `defaultValue` if there is less than `offset` rows after the current row. For example, an
+ * `offset` of one will return the next row at any given point in the window partition.
+ *
+ * This is equivalent to the LEAD function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lead(e: Column, offset: Int, defaultValue: Any): Column = {
+ lead(e, offset, defaultValue, false)
+ }
+
+ /**
+ * Window function: returns the value that is `offset` rows after the current row, and
+ * `defaultValue` if there is less than `offset` rows after the current row. `ignoreNulls`
+ * determines whether null values of row are included in or eliminated from the calculation. The
+ * default value of `ignoreNulls` is false. For example, an `offset` of one will return the next
+ * row at any given point in the window partition.
+ *
+ * This is equivalent to the LEAD function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def lead(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column =
+ Column.fn("lead", e, lit(offset), lit(defaultValue), lit(ignoreNulls))
+
+ /**
+ * Window function: returns the value that is the `offset`th row of the window frame (counting
+ * from 1), and `null` if the size of window frame is less than `offset` rows.
+ *
+ * It will return the `offset`th non-null value it sees when ignoreNulls is set to true. If all
+ * values are null, then null is returned.
+ *
+ * This is equivalent to the nth_value function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def nth_value(e: Column, offset: Int, ignoreNulls: Boolean): Column =
+ Column.fn("nth_value", e, lit(offset), lit(ignoreNulls))
+
+ /**
+ * Window function: returns the value that is the `offset`th row of the window frame (counting
+ * from 1), and `null` if the size of window frame is less than `offset` rows.
+ *
+ * This is equivalent to the nth_value function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def nth_value(e: Column, offset: Int): Column =
+ Column.fn("nth_value", e, lit(offset))
+
+ /**
+ * Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
+ * partition. For example, if `n` is 4, the first quarter of the rows will get value 1, the
+ * second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
+ *
+ * This is equivalent to the NTILE function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def ntile(n: Int): Column = Column.fn("ntile", lit(n))
+
+ /**
+ * Window function: returns the relative rank (i.e. percentile) of rows within a window
+ * partition.
+ *
+ * This is computed by:
+ * {{{
+ * (rank of row in its partition - 1) / (number of rows in the partition - 1)
+ * }}}
+ *
+ * This is equivalent to the PERCENT_RANK function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def percent_rank(): Column = Column.fn("percent_rank")
+
+ /**
+ * Window function: returns the rank of rows within a window partition.
+ *
+ * The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
+ * sequence when there are ties. That is, if you were ranking a competition using dense_rank and
+ * had three people tie for second place, you would say that all three were in second place and
+ * that the next person came in third. Rank would give me sequential numbers, making the person
+ * that came in third place (after the ties) would register as coming in fifth.
+ *
+ * This is equivalent to the RANK function in SQL.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def rank(): Column = Column.fn("rank")
+
+ /**
+ * Window function: returns a sequential number starting at 1 within a window partition.
+ *
+ * @group window_funcs
+ * @since 3.4.0
+ */
+ def row_number(): Column = Column.fn("row_number")
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Non-aggregate functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index d600ac432a2..f9118b93ec5 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -171,6 +171,20 @@ class FunctionTestSuite extends ConnectFunSuite {
window(a, "10 seconds"))
testEquals("session_window", session_window(a, "1 second"), session_window(a, lit("1 second")))
testEquals("bucket", bucket(lit(3), a), bucket(3, a))
+ testEquals(
+ "lag",
+ lag(a, 1),
+ lag("a", 1),
+ lag(a, 1, null),
+ lag("a", 1, null),
+ lag(a, 1, null, false))
+ testEquals(
+ "lead",
+ lead(a, 2),
+ lead("a", 2),
+ lead(a, 2, null),
+ lead("a", 2, null),
+ lead(a, 2, null, false))
test("assert_true no message") {
val e = assert_true(a).expr
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 4cd7bfa0887..42572f8427e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -1407,6 +1407,45 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
fn.bucket(3, Column("a"))
}
+ functionTest("cume_dist") {
+ fn.cume_dist().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("dense_rank") {
+ fn.dense_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("lag") {
+ fn.lag(Column("g"), 1, null, ignoreNulls = true)
+ .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("lead") {
+ fn.lead(Column("g"), 2, "dv", ignoreNulls = true)
+ .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("nth_value") {
+ fn.nth_value(Column("g"), 3, ignoreNulls = true)
+ .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("ntile") {
+ fn.ntile(4).over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("percent_rank") {
+ fn.percent_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("rank") {
+ fn.rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
+ functionTest("row_number") {
+ fn.row_number().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+ }
+
private def temporalFunctionTest(name: String)(f: => Column): Unit = {
test("function " + name) {
temporals.select(f)
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain
new file mode 100644
index 00000000000..4f15f83bb9f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain
@@ -0,0 +1,5 @@
+Project [cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [cume_dist() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain
new file mode 100644
index 00000000000..0cce71ad1d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain
@@ -0,0 +1,5 @@
+Project [DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [dense_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [id#0L, a#0]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain
new file mode 100644
index 00000000000..6d9d4e706ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain
@@ -0,0 +1,5 @@
+Project [lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0]
++- Project [g#0, a#0, id#0L, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0]
+ +- Window [lag(g#0, -1, null) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [g#0, a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain
new file mode 100644
index 00000000000..6c8ce180b79
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain
@@ -0,0 +1,5 @@
+Project [lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0]
++- Project [g#0, a#0, id#0L, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0]
+ +- Window [lead(g#0, 2, dv) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 2)) AS lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [g#0, a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain
new file mode 100644
index 00000000000..69eb7872d52
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain
@@ -0,0 +1,5 @@
+Project [nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [g#0, a#0, id#0L, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [nth_value(g#0, 3, true) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [g#0, a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain
new file mode 100644
index 00000000000..349ac7bbe8b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain
@@ -0,0 +1,5 @@
+Project [ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [ntile(4) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain
new file mode 100644
index 00000000000..012931bd2aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain
@@ -0,0 +1,5 @@
+Project [PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [percent_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [id#0L, a#0]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain
new file mode 100644
index 00000000000..b8d4b5ee756
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain
@@ -0,0 +1,5 @@
+Project [RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [id#0L, a#0]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain
new file mode 100644
index 00000000000..d0c817f8894
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain
@@ -0,0 +1,5 @@
+Project [row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+ +- Window [row_number() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+ +- Project [a#0, id#0L]
+ +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json
new file mode 100644
index 00000000000..4e22d94aa30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json
@@ -0,0 +1,32 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "cume_dist"
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin
new file mode 100644
index 00000000000..fe9c87a0a03
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>&Z$
+
+ cume_dist
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json
new file mode 100644
index 00000000000..3cc81b32613
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json
@@ -0,0 +1,32 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "dense_rank"
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin
new file mode 100644
index 00000000000..2df47f4ce65
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z%
+
+
+dense_rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json
new file mode 100644
index 00000000000..ee529f00dc5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json
@@ -0,0 +1,52 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "lag",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "integer": 1
+ }
+ }, {
+ "literal": {
+ "null": {
+ "null": {
+ }
+ }
+ }
+ }, {
+ "literal": {
+ "boolean": true
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin
new file mode 100644
index 00000000000..908b872ec53
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json
new file mode 100644
index 00000000000..8c38eec6daf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json
@@ -0,0 +1,49 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "lead",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "integer": 2
+ }
+ }, {
+ "literal": {
+ "string": "dv"
+ }
+ }, {
+ "literal": {
+ "boolean": true
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin
new file mode 100644
index 00000000000..45f9e784037
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string><Z:
+#!
+lead
+g
+0
+jdv
+
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json
new file mode 100644
index 00000000000..4d14c28dfcf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json
@@ -0,0 +1,45 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "nth_value",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "integer": 3
+ }
+ }, {
+ "literal": {
+ "boolean": true
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin
new file mode 100644
index 00000000000..51d26e4c70b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>9Z7
+
+ nth_value
+g
+0
+
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json
new file mode 100644
index 00000000000..1cd06b27791
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json
@@ -0,0 +1,37 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "ntile",
+ "arguments": [{
+ "literal": {
+ "integer": 4
+ }
+ }]
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin
new file mode 100644
index 00000000000..6fec68ebb19
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>(Z&
+
+ntile
+0
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json
new file mode 100644
index 00000000000..3119e2af8b7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json
@@ -0,0 +1,32 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "percent_rank"
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin
new file mode 100644
index 00000000000..594de0f59b7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>)Z'
+
+percent_rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json
new file mode 100644
index 00000000000..ab199e7c6d2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json
@@ -0,0 +1,32 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "rank"
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin
new file mode 100644
index 00000000000..d15a9c6d0d6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>!Z
+
+rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json
new file mode 100644
index 00000000000..2185a41e2fb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json
@@ -0,0 +1,32 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "window": {
+ "windowFunction": {
+ "unresolvedFunction": {
+ "functionName": "row_number"
+ }
+ },
+ "partitionSpec": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "orderSpec": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin
new file mode 100644
index 00000000000..080d1fc35b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z%
+
+
+row_number
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4a02ab66ea8..a14d3632d28 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -974,6 +974,26 @@ class SparkConnectPlanner(val session: SparkSession) {
}
Some(NthValue(children(0), children(1), ignoreNulls))
+ case "lag" if fun.getArgumentsCount == 4 =>
+ // Lag does not have a constructor which accepts Expression typed 'ignoreNulls'
+ val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+ val ignoreNulls = children.last match {
+ case Literal(bool: Boolean, BooleanType) => bool
+ case other =>
+ throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other")
+ }
+ Some(Lag(children.head, children(1), children(2), ignoreNulls))
+
+ case "lead" if fun.getArgumentsCount == 4 =>
+ // Lead does not have a constructor which accepts Expression typed 'ignoreNulls'
+ val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+ val ignoreNulls = children.last match {
+ case Literal(bool: Boolean, BooleanType) => bool
+ case other =>
+ throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other")
+ }
+ Some(Lead(children.head, children(1), children(2), ignoreNulls))
+
case "window" if 2 <= fun.getArgumentsCount && fun.getArgumentsCount <= 4 =>
val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
val timeCol = children.head
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org