You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/22 14:54:11 UTC

[spark] branch branch-3.4 updated: [SPARK-42527][CONNECT] Scala Client add Window functions

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 921a633c86c [SPARK-42527][CONNECT] Scala Client add Window functions
921a633c86c is described below

commit 921a633c86ce57f4498f5f355af37f80db832298
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Wed Feb 22 10:53:15 2023 -0400

    [SPARK-42527][CONNECT] Scala Client add Window functions
    
    ### What changes were proposed in this pull request?
    This PR aims add the window functions to the Scala spark connect client.
    
    ### Why are the changes needed?
    Provide same APIs in the Scala spark connect client as in the original Dataset API.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it adds new for functions to the Spark Connect Scala client.
    
    ### How was this patch tested?
    
    - Add new test
    - Manual checked connect-client-jvm and connect with Scala-2.13
    
    Closes #40120 from LuciferYang/window-functions.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit e2f65b5316ed1473518e2d79e89c9bed756029e9)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../scala/org/apache/spark/sql/functions.scala     | 256 ++++++++++++++++++++-
 .../org/apache/spark/sql/FunctionTestSuite.scala   |  14 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  39 ++++
 .../explain-results/function_cume_dist.explain     |   5 +
 .../explain-results/function_dense_rank.explain    |   5 +
 .../explain-results/function_lag.explain           |   5 +
 .../explain-results/function_lead.explain          |   5 +
 .../explain-results/function_nth_value.explain     |   5 +
 .../explain-results/function_ntile.explain         |   5 +
 .../explain-results/function_percent_rank.explain  |   5 +
 .../explain-results/function_rank.explain          |   5 +
 .../explain-results/function_row_number.explain    |   5 +
 .../query-tests/queries/function_cume_dist.json    |  32 +++
 .../queries/function_cume_dist.proto.bin           |   7 +
 .../query-tests/queries/function_dense_rank.json   |  32 +++
 .../queries/function_dense_rank.proto.bin          |   8 +
 .../query-tests/queries/function_lag.json          |  52 +++++
 .../query-tests/queries/function_lag.proto.bin     | Bin 0 -> 209 bytes
 .../query-tests/queries/function_lead.json         |  49 ++++
 .../query-tests/queries/function_lead.proto.bin    |  11 +
 .../query-tests/queries/function_nth_value.json    |  45 ++++
 .../queries/function_nth_value.proto.bin           |  10 +
 .../query-tests/queries/function_ntile.json        |  37 +++
 .../query-tests/queries/function_ntile.proto.bin   |   8 +
 .../query-tests/queries/function_percent_rank.json |  32 +++
 .../queries/function_percent_rank.proto.bin        |   7 +
 .../query-tests/queries/function_rank.json         |  32 +++
 .../query-tests/queries/function_rank.proto.bin    |   7 +
 .../query-tests/queries/function_row_number.json   |  32 +++
 .../queries/function_row_number.proto.bin          |   8 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |  20 ++
 31 files changed, 782 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 4996b5033e3..0fd35b570f8 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -100,6 +100,9 @@ object functions {
         .setValue(value)
     }
 
+  private val nullType =
+    proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build()
+
   /**
    * Creates a [[Column]] of literal value.
    *
@@ -129,7 +132,7 @@ object functions {
       case v: Array[Byte] => createLiteral(_.setBinary(ByteString.copyFrom(v)))
       case v: collection.mutable.WrappedArray[_] => lit(v.array)
       case v: LocalDate => createLiteral(_.setDate(v.toEpochDay.toInt))
-      case null => unsupported("Null literals not supported yet.")
+      case null => createLiteral(_.setNull(nullType))
       case _ => unsupported(s"literal $literal not supported (yet).")
     }
   }
@@ -895,6 +898,257 @@ object functions {
    */
   def var_pop(columnName: String): Column = var_pop(Column(columnName))
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Window functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Window function: returns the cumulative distribution of values within a window partition,
+   * i.e. the fraction of rows that are below the current row.
+   *
+   * {{{
+   *   N = total number of rows in the partition
+   *   cumeDist(x) = number of values before (and including) x / N
+   * }}}
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def cume_dist(): Column = Column.fn("cume_dist")
+
+  /**
+   * Window function: returns the rank of rows within a window partition, without any gaps.
+   *
+   * The difference between rank and dense_rank is that denseRank leaves no gaps in ranking
+   * sequence when there are ties. That is, if you were ranking a competition using dense_rank and
+   * had three people tie for second place, you would say that all three were in second place and
+   * that the next person came in third. Rank would give me sequential numbers, making the person
+   * that came in third place (after the ties) would register as coming in fifth.
+   *
+   * This is equivalent to the DENSE_RANK function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def dense_rank(): Column = Column.fn("dense_rank")
+
+  /**
+   * Window function: returns the value that is `offset` rows before the current row, and `null`
+   * if there is less than `offset` rows before the current row. For example, an `offset` of one
+   * will return the previous row at any given point in the window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lag(e: Column, offset: Int): Column = lag(e, offset, null)
+
+  /**
+   * Window function: returns the value that is `offset` rows before the current row, and `null`
+   * if there is less than `offset` rows before the current row. For example, an `offset` of one
+   * will return the previous row at any given point in the window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lag(columnName: String, offset: Int): Column = lag(columnName, offset, null)
+
+  /**
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `defaultValue` if there is less than `offset` rows before the current row. For example, an
+   * `offset` of one will return the previous row at any given point in the window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lag(columnName: String, offset: Int, defaultValue: Any): Column = {
+    lag(Column(columnName), offset, defaultValue)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `defaultValue` if there is less than `offset` rows before the current row. For example, an
+   * `offset` of one will return the previous row at any given point in the window partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lag(e: Column, offset: Int, defaultValue: Any): Column = {
+    lag(e, offset, defaultValue, false)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows before the current row, and
+   * `defaultValue` if there is less than `offset` rows before the current row. `ignoreNulls`
+   * determines whether null values of row are included in or eliminated from the calculation. For
+   * example, an `offset` of one will return the previous row at any given point in the window
+   * partition.
+   *
+   * This is equivalent to the LAG function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lag(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column =
+    Column.fn("lag", e, lit(offset), lit(defaultValue), lit(ignoreNulls))
+
+  /**
+   * Window function: returns the value that is `offset` rows after the current row, and `null` if
+   * there is less than `offset` rows after the current row. For example, an `offset` of one will
+   * return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lead(columnName: String, offset: Int): Column = {
+    lead(columnName, offset, null)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows after the current row, and `null` if
+   * there is less than `offset` rows after the current row. For example, an `offset` of one will
+   * return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lead(e: Column, offset: Int): Column = {
+    lead(e, offset, null)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `defaultValue` if there is less than `offset` rows after the current row. For example, an
+   * `offset` of one will return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lead(columnName: String, offset: Int, defaultValue: Any): Column = {
+    lead(Column(columnName), offset, defaultValue)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `defaultValue` if there is less than `offset` rows after the current row. For example, an
+   * `offset` of one will return the next row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lead(e: Column, offset: Int, defaultValue: Any): Column = {
+    lead(e, offset, defaultValue, false)
+  }
+
+  /**
+   * Window function: returns the value that is `offset` rows after the current row, and
+   * `defaultValue` if there is less than `offset` rows after the current row. `ignoreNulls`
+   * determines whether null values of row are included in or eliminated from the calculation. The
+   * default value of `ignoreNulls` is false. For example, an `offset` of one will return the next
+   * row at any given point in the window partition.
+   *
+   * This is equivalent to the LEAD function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def lead(e: Column, offset: Int, defaultValue: Any, ignoreNulls: Boolean): Column =
+    Column.fn("lead", e, lit(offset), lit(defaultValue), lit(ignoreNulls))
+
+  /**
+   * Window function: returns the value that is the `offset`th row of the window frame (counting
+   * from 1), and `null` if the size of window frame is less than `offset` rows.
+   *
+   * It will return the `offset`th non-null value it sees when ignoreNulls is set to true. If all
+   * values are null, then null is returned.
+   *
+   * This is equivalent to the nth_value function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def nth_value(e: Column, offset: Int, ignoreNulls: Boolean): Column =
+    Column.fn("nth_value", e, lit(offset), lit(ignoreNulls))
+
+  /**
+   * Window function: returns the value that is the `offset`th row of the window frame (counting
+   * from 1), and `null` if the size of window frame is less than `offset` rows.
+   *
+   * This is equivalent to the nth_value function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def nth_value(e: Column, offset: Int): Column =
+    Column.fn("nth_value", e, lit(offset))
+
+  /**
+   * Window function: returns the ntile group id (from 1 to `n` inclusive) in an ordered window
+   * partition. For example, if `n` is 4, the first quarter of the rows will get value 1, the
+   * second quarter will get 2, the third quarter will get 3, and the last quarter will get 4.
+   *
+   * This is equivalent to the NTILE function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def ntile(n: Int): Column = Column.fn("ntile", lit(n))
+
+  /**
+   * Window function: returns the relative rank (i.e. percentile) of rows within a window
+   * partition.
+   *
+   * This is computed by:
+   * {{{
+   *   (rank of row in its partition - 1) / (number of rows in the partition - 1)
+   * }}}
+   *
+   * This is equivalent to the PERCENT_RANK function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def percent_rank(): Column = Column.fn("percent_rank")
+
+  /**
+   * Window function: returns the rank of rows within a window partition.
+   *
+   * The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
+   * sequence when there are ties. That is, if you were ranking a competition using dense_rank and
+   * had three people tie for second place, you would say that all three were in second place and
+   * that the next person came in third. Rank would give me sequential numbers, making the person
+   * that came in third place (after the ties) would register as coming in fifth.
+   *
+   * This is equivalent to the RANK function in SQL.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def rank(): Column = Column.fn("rank")
+
+  /**
+   * Window function: returns a sequential number starting at 1 within a window partition.
+   *
+   * @group window_funcs
+   * @since 3.4.0
+   */
+  def row_number(): Column = Column.fn("row_number")
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Non-aggregate functions
   //////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index d600ac432a2..f9118b93ec5 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -171,6 +171,20 @@ class FunctionTestSuite extends ConnectFunSuite {
     window(a, "10 seconds"))
   testEquals("session_window", session_window(a, "1 second"), session_window(a, lit("1 second")))
   testEquals("bucket", bucket(lit(3), a), bucket(3, a))
+  testEquals(
+    "lag",
+    lag(a, 1),
+    lag("a", 1),
+    lag(a, 1, null),
+    lag("a", 1, null),
+    lag(a, 1, null, false))
+  testEquals(
+    "lead",
+    lead(a, 2),
+    lead("a", 2),
+    lead(a, 2, null),
+    lead("a", 2, null),
+    lead(a, 2, null, false))
 
   test("assert_true no message") {
     val e = assert_true(a).expr
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 4cd7bfa0887..42572f8427e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -1407,6 +1407,45 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
     fn.bucket(3, Column("a"))
   }
 
+  functionTest("cume_dist") {
+    fn.cume_dist().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("dense_rank") {
+    fn.dense_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("lag") {
+    fn.lag(Column("g"), 1, null, ignoreNulls = true)
+      .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("lead") {
+    fn.lead(Column("g"), 2, "dv", ignoreNulls = true)
+      .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("nth_value") {
+    fn.nth_value(Column("g"), 3, ignoreNulls = true)
+      .over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("ntile") {
+    fn.ntile(4).over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("percent_rank") {
+    fn.percent_rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("rank") {
+    fn.rank().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
+  functionTest("row_number") {
+    fn.row_number().over(Window.partitionBy(Column("a")).orderBy(Column("id")))
+  }
+
   private def temporalFunctionTest(name: String)(f: => Column): Unit = {
     test("function " + name) {
       temporals.select(f)
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain
new file mode 100644
index 00000000000..4f15f83bb9f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cume_dist.explain
@@ -0,0 +1,5 @@
+Project [cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [cume_dist() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS cume_dist() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain
new file mode 100644
index 00000000000..0cce71ad1d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_dense_rank.explain
@@ -0,0 +1,5 @@
+Project [DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [dense_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS DENSE_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [id#0L, a#0]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain
new file mode 100644
index 00000000000..6d9d4e706ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lag.explain
@@ -0,0 +1,5 @@
+Project [lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0]
++- Project [g#0, a#0, id#0L, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0, lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0]
+   +- Window [lag(g#0, -1, null) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS lag(g, 1, NULL) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [g#0, a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain
new file mode 100644
index 00000000000..6c8ce180b79
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_lead.explain
@@ -0,0 +1,5 @@
+Project [lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0]
++- Project [g#0, a#0, id#0L, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0, lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0]
+   +- Window [lead(g#0, 2, dv) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 2)) AS lead(g, 2, dv) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 2 FOLLOWING)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [g#0, a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain
new file mode 100644
index 00000000000..69eb7872d52
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_nth_value.explain
@@ -0,0 +1,5 @@
+Project [nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [g#0, a#0, id#0L, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [nth_value(g#0, 3, true) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS nth_value(g, 3) ignore nulls OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [g#0, a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain
new file mode 100644
index 00000000000..349ac7bbe8b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_ntile.explain
@@ -0,0 +1,5 @@
+Project [ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [ntile(4) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS ntile(4) OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain
new file mode 100644
index 00000000000..012931bd2aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_percent_rank.explain
@@ -0,0 +1,5 @@
+Project [PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [percent_rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS PERCENT_RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [id#0L, a#0]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain
new file mode 100644
index 00000000000..b8d4b5ee756
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_rank.explain
@@ -0,0 +1,5 @@
+Project [RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [id#0L, a#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [rank(id#0L) windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS RANK() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [id#0L, a#0]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain
new file mode 100644
index 00000000000..d0c817f8894
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_row_number.explain
@@ -0,0 +1,5 @@
+Project [row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
++- Project [a#0, id#0L, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0, row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0]
+   +- Window [row_number() windowspecdefinition(a#0, id#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS row_number() OVER (PARTITION BY a ORDER BY id ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0], [a#0], [id#0L ASC NULLS FIRST]
+      +- Project [a#0, id#0L]
+         +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json
new file mode 100644
index 00000000000..4e22d94aa30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "cume_dist"
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin
new file mode 100644
index 00000000000..fe9c87a0a03
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cume_dist.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>&Z$
+

+	cume_dist
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json
new file mode 100644
index 00000000000..3cc81b32613
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "dense_rank"
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin
new file mode 100644
index 00000000000..2df47f4ce65
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_dense_rank.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z%
+
+
+dense_rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json
new file mode 100644
index 00000000000..ee529f00dc5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.json
@@ -0,0 +1,52 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "lag",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "g"
+              }
+            }, {
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "null": {
+                  "null": {
+                  }
+                }
+              }
+            }, {
+              "literal": {
+                "boolean": true
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin
new file mode 100644
index 00000000000..908b872ec53
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_lag.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json
new file mode 100644
index 00000000000..8c38eec6daf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.json
@@ -0,0 +1,49 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "lead",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "g"
+              }
+            }, {
+              "literal": {
+                "integer": 2
+              }
+            }, {
+              "literal": {
+                "string": "dv"
+              }
+            }, {
+              "literal": {
+                "boolean": true
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin
new file mode 100644
index 00000000000..45f9e784037
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_lead.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string><Z:
+#!
+lead
+g
+0
+jdv
+
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json
new file mode 100644
index 00000000000..4d14c28dfcf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.json
@@ -0,0 +1,45 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "nth_value",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "g"
+              }
+            }, {
+              "literal": {
+                "integer": 3
+              }
+            }, {
+              "literal": {
+                "boolean": true
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin
new file mode 100644
index 00000000000..51d26e4c70b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_nth_value.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>9Z7
+ 
+	nth_value
+g
+0
+
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json
new file mode 100644
index 00000000000..1cd06b27791
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.json
@@ -0,0 +1,37 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "ntile",
+            "arguments": [{
+              "literal": {
+                "integer": 4
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin
new file mode 100644
index 00000000000..6fec68ebb19
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_ntile.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>(Z&
+
+ntile
+0
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json
new file mode 100644
index 00000000000..3119e2af8b7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "percent_rank"
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin
new file mode 100644
index 00000000000..594de0f59b7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_percent_rank.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>)Z'
+
+percent_rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json
new file mode 100644
index 00000000000..ab199e7c6d2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "rank"
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin
new file mode 100644
index 00000000000..d15a9c6d0d6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_rank.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>!Z
+
+rank
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json
new file mode 100644
index 00000000000..2185a41e2fb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "row_number"
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }],
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "id"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin
new file mode 100644
index 00000000000..080d1fc35b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_row_number.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'Z%
+
+
+row_number
+a
+
+id
\ No newline at end of file
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4a02ab66ea8..a14d3632d28 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -974,6 +974,26 @@ class SparkConnectPlanner(val session: SparkSession) {
         }
         Some(NthValue(children(0), children(1), ignoreNulls))
 
+      case "lag" if fun.getArgumentsCount == 4 =>
+        // Lag does not have a constructor which accepts Expression typed 'ignoreNulls'
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val ignoreNulls = children.last match {
+          case Literal(bool: Boolean, BooleanType) => bool
+          case other =>
+            throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other")
+        }
+        Some(Lag(children.head, children(1), children(2), ignoreNulls))
+
+      case "lead" if fun.getArgumentsCount == 4 =>
+        // Lead does not have a constructor which accepts Expression typed 'ignoreNulls'
+        val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
+        val ignoreNulls = children.last match {
+          case Literal(bool: Boolean, BooleanType) => bool
+          case other =>
+            throw InvalidPlanInput(s"ignoreNulls should be a literal boolean, but got $other")
+        }
+        Some(Lead(children.head, children(1), children(2), ignoreNulls))
+
       case "window" if 2 <= fun.getArgumentsCount && fun.getArgumentsCount <= 4 =>
         val children = fun.getArgumentsList.asScala.toSeq.map(transformExpression)
         val timeCol = children.head


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org