You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/22 07:19:45 UTC

[spark] branch branch-3.4 updated: [SPARK-42520][CONNECT] Support basic Window API in Scala client

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new c123c85aa88 [SPARK-42520][CONNECT] Support basic Window API in Scala client
c123c85aa88 is described below

commit c123c85aa88ad10e04ab9b53f6bab20e510175ac
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Wed Feb 22 15:19:00 2023 +0800

    [SPARK-42520][CONNECT] Support basic Window API in Scala client
    
    ### What changes were proposed in this pull request?
    
    Support Window orderby, partitionby, rowsbetween/rangebetween.
    
    ### Why are the changes needed?
    
    API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #40107 from amaliujia/rw-window-2.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 054522b67626aa1515b8f3f164ba7c063c38e5b8)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Column.scala   |  33 +++
 .../org/apache/spark/sql/expressions/Window.scala  | 241 +++++++++++++++++++++
 .../apache/spark/sql/expressions/WindowSpec.scala  | 240 ++++++++++++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  11 +
 .../query-tests/explain-results/window.explain     |   8 +
 .../test/resources/query-tests/queries/window.json | 205 ++++++++++++++++++
 .../resources/query-tests/queries/window.proto.bin |  43 ++++
 7 files changed, 781 insertions(+)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index c3e1113aa45..fde17963bfd 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -24,6 +24,7 @@ import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types.{DataType, Metadata}
 
@@ -1233,6 +1234,38 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg
    * @since 3.4.0
    */
   def bitwiseXOR(other: Any): Column = fn("^", other)
+
+  /**
+   * Defines a windowing column.
+   *
+   * {{{
+   *   val w = Window.partitionBy("name").orderBy("id")
+   *   df.select(
+   *     sum("price").over(w.rangeBetween(Window.unboundedPreceding, 2)),
+   *     avg("price").over(w.rowsBetween(Window.currentRow, 4))
+   *   )
+   * }}}
+   *
+   * @group expr_ops
+   * @since 3.4.0
+   */
+  def over(window: expressions.WindowSpec): Column = window.withAggregate(this)
+
+  /**
+   * Defines an empty analytic clause. In this case the analytic function is applied and presented
+   * for all rows in the result set.
+   *
+   * {{{
+   *   df.select(
+   *     sum("price").over(),
+   *     avg("price").over()
+   *   )
+   * }}}
+   *
+   * @group expr_ops
+   * @since 3.4.0
+   */
+  def over(): Column = over(Window.spec)
 }
 
 private[sql] object Column {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala
new file mode 100644
index 00000000000..c85e7bc9c5c
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/Window.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.expressions
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.Column
+
+/**
+ * Utility functions for defining window in DataFrames.
+ *
+ * {{{
+ *   // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ *   Window.partitionBy("country").orderBy("date")
+ *     .rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ *
+ *   // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
+ *   Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
+ * }}}
+ *
+ * @note
+ *   When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding,
+ *   unboundedFollowing) is used by default. When ordering is defined, a growing window frame
+ *   (rangeFrame, unboundedPreceding, currentRow) is used by default.
+ *
+ * @since 3.4.0
+ */
+@Stable
+object Window {
+
+  /**
+   * Creates a [[WindowSpec]] with the partitioning defined.
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colName: String, colNames: String*): WindowSpec = {
+    spec.partitionBy(colName, colNames: _*)
+  }
+
+  /**
+   * Creates a [[WindowSpec]] with the partitioning defined.
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(cols: Column*): WindowSpec = {
+    spec.partitionBy(cols: _*)
+  }
+
+  /**
+   * Creates a [[WindowSpec]] with the ordering defined.
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(colName: String, colNames: String*): WindowSpec = {
+    spec.orderBy(colName, colNames: _*)
+  }
+
+  /**
+   * Creates a [[WindowSpec]] with the ordering defined.
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(cols: Column*): WindowSpec = {
+    spec.orderBy(cols: _*)
+  }
+
+  /**
+   * Value representing the first row in the partition, equivalent to "UNBOUNDED PRECEDING" in
+   * SQL. This can be used to specify the frame boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def unboundedPreceding: Long = Long.MinValue
+
+  /**
+   * Value representing the last row in the partition, equivalent to "UNBOUNDED FOLLOWING" in SQL.
+   * This can be used to specify the frame boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def unboundedFollowing: Long = Long.MaxValue
+
+  /**
+   * Value representing the current row. This can be used to specify the frame boundaries:
+   *
+   * {{{
+   *   Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
+   * }}}
+   *
+   * @since 3.4.0
+   */
+  def currentRow: Long = 0
+
+  /**
+   * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end`
+   * (inclusive).
+   *
+   * Both `start` and `end` are positions relative to the current row. For example, "0" means
+   * "current row", while "-1" means the row before the current row, and "5" means the fifth row
+   * after the current row.
+   *
+   * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+   * `Window.currentRow` to specify special boundary values, rather than using integral values
+   * directly.
+   *
+   * A row based boundary is based on the position of the row within the partition. An offset
+   * indicates the number of rows above or below the current row, the frame for the current row
+   * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1
+   * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to
+   * index 7.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1)
+   *   df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  2|
+   *   |  1|       a|  3|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start
+   *   boundary start, inclusive. The frame is unbounded if this is the minimum long value
+   *   (`Window.unboundedPreceding`).
+   * @param end
+   *   boundary end, inclusive. The frame is unbounded if this is the maximum long value
+   *   (`Window.unboundedFollowing`).
+   * @since 3.4.0
+   */
+  // Note: when updating the doc for this method, also update WindowSpec.rowsBetween.
+  def rowsBetween(start: Long, end: Long): WindowSpec = {
+    spec.rowsBetween(start, end)
+  }
+
+  /**
+   * Creates a [[WindowSpec]] with the frame boundaries defined, from `start` (inclusive) to `end`
+   * (inclusive).
+   *
+   * Both `start` and `end` are relative to the current row. For example, "0" means "current row",
+   * while "-1" means one off before the current row, and "5" means the five off after the current
+   * row.
+   *
+   * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+   * `Window.currentRow` to specify special boundary values, rather than using long values
+   * directly.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset
+   * is used to alter the value of the ORDER BY expression, for instance if the current ORDER BY
+   * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for
+   * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY
+   * expressions: there can be only one expression and this expression must have a numerical data
+   * type. An exception can be made when the offset is unbounded, because no value modification is
+   * needed, in this case multiple and non-numeric ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1)
+   *   df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start
+   *   boundary start, inclusive. The frame is unbounded if this is the minimum long value
+   *   (`Window.unboundedPreceding`).
+   * @param end
+   *   boundary end, inclusive. The frame is unbounded if this is the maximum long value
+   *   (`Window.unboundedFollowing`).
+   * @since 3.4.0
+   */
+  // Note: when updating the doc for this method, also update WindowSpec.rangeBetween.
+  def rangeBetween(start: Long, end: Long): WindowSpec = {
+    spec.rangeBetween(start, end)
+  }
+
+  private[sql] def spec: WindowSpec = {
+    new WindowSpec(Seq.empty, Seq.empty, None)
+  }
+
+}
+
+/**
+ * Utility functions for defining window in DataFrames.
+ *
+ * {{{
+ *   // PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+ *   Window.partitionBy("country").orderBy("date")
+ *     .rowsBetween(Window.unboundedPreceding, Window.currentRow)
+ *
+ *   // PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
+ *   Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
+ * }}}
+ *
+ * @since 3.4.0
+ */
+@Stable
+class Window private () // So we can see Window in JavaDoc.
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
new file mode 100644
index 00000000000..cecfb6a0d91
--- /dev/null
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.expressions
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.annotation.Stable
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.Column
+
+/**
+ * A window specification that defines the partitioning, ordering, and frame boundaries.
+ *
+ * Use the static methods in [[Window]] to create a [[WindowSpec]].
+ *
+ * @since 3.4.0
+ */
+@Stable
+class WindowSpec private[sql] (
+    partitionSpec: Seq[proto.Expression],
+    orderSpec: Seq[proto.Expression.SortOrder],
+    frame: Option[proto.Expression.Window.WindowFrame]) {
+
+  /**
+   * Defines the partitioning columns in a [[WindowSpec]].
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colName: String, colNames: String*): WindowSpec = {
+    partitionBy((colName +: colNames).map(Column(_)): _*)
+  }
+
+  /**
+   * Defines the partitioning columns in a [[WindowSpec]].
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(cols: Column*): WindowSpec = {
+    new WindowSpec(cols.map(_.expr), orderSpec, frame)
+  }
+
+  /**
+   * Defines the ordering columns in a [[WindowSpec]].
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(colName: String, colNames: String*): WindowSpec = {
+    orderBy((colName +: colNames).map(Column(_)): _*)
+  }
+
+  /**
+   * Defines the ordering columns in a [[WindowSpec]].
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(cols: Column*): WindowSpec = {
+    val sortOrder: Seq[proto.Expression.SortOrder] = cols.map(_.sortOrder)
+    new WindowSpec(partitionSpec, sortOrder, frame)
+  }
+
+  /**
+   * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+   *
+   * Both `start` and `end` are relative positions from the current row. For example, "0" means
+   * "current row", while "-1" means the row before the current row, and "5" means the fifth row
+   * after the current row.
+   *
+   * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+   * `Window.currentRow` to specify special boundary values, rather than using integral values
+   * directly.
+   *
+   * A row based boundary is based on the position of the row within the partition. An offset
+   * indicates the number of rows above or below the current row, the frame for the current row
+   * starts or ends. For instance, given a row based sliding frame with a lower bound offset of -1
+   * and a upper bound offset of +2. The frame for row with index 5 would range from index 4 to
+   * index 7.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy($"category").orderBy($"id").rowsBetween(Window.currentRow, 1)
+   *   df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  2|
+   *   |  1|       a|  3|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start
+   *   boundary start, inclusive. The frame is unbounded if this is the minimum long value
+   *   (`Window.unboundedPreceding`).
+   * @param end
+   *   boundary end, inclusive. The frame is unbounded if this is the maximum long value
+   *   (`Window.unboundedFollowing`).
+   * @since 3.4.0
+   */
+  // Note: when updating the doc for this method, also update Window.rowsBetween.
+  def rowsBetween(start: Long, end: Long): WindowSpec = {
+    new WindowSpec(
+      partitionSpec,
+      orderSpec,
+      Some(
+        toWindowFrame(
+          proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_ROW,
+          start,
+          end,
+          true)))
+  }
+
+  /**
+   * Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
+   *
+   * Both `start` and `end` are relative from the current row. For example, "0" means "current
+   * row", while "-1" means one off before the current row, and "5" means the five off after the
+   * current row.
+   *
+   * We recommend users use `Window.unboundedPreceding`, `Window.unboundedFollowing`, and
+   * `Window.currentRow` to specify special boundary values, rather than using long values
+   * directly.
+   *
+   * A range-based boundary is based on the actual value of the ORDER BY expression(s). An offset
+   * is used to alter the value of the ORDER BY expression, for instance if the current order by
+   * expression has a value of 10 and the lower bound offset is -3, the resulting lower bound for
+   * the current row will be 10 - 3 = 7. This however puts a number of constraints on the ORDER BY
+   * expressions: there can be only one expression and this expression must have a numerical data
+   * type. An exception can be made when the offset is unbounded, because no value modification is
+   * needed, in this case multiple and non-numeric ORDER BY expression are allowed.
+   *
+   * {{{
+   *   import org.apache.spark.sql.expressions.Window
+   *   val df = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
+   *     .toDF("id", "category")
+   *   val byCategoryOrderedById =
+   *     Window.partitionBy($"category").orderBy($"id").rangeBetween(Window.currentRow, 1)
+   *   df.withColumn("sum", sum($"id") over byCategoryOrderedById).show()
+   *
+   *   +---+--------+---+
+   *   | id|category|sum|
+   *   +---+--------+---+
+   *   |  1|       b|  3|
+   *   |  2|       b|  5|
+   *   |  3|       b|  3|
+   *   |  1|       a|  4|
+   *   |  1|       a|  4|
+   *   |  2|       a|  2|
+   *   +---+--------+---+
+   * }}}
+   *
+   * @param start
+   *   boundary start, inclusive. The frame is unbounded if this is the minimum long value
+   *   (`Window.unboundedPreceding`).
+   * @param end
+   *   boundary end, inclusive. The frame is unbounded if this is the maximum long value
+   *   (`Window.unboundedFollowing`).
+   * @since 3.4.0
+   */
+  // Note: when updating the doc for this method, also update Window.rangeBetween.
+  def rangeBetween(start: Long, end: Long): WindowSpec = {
+    new WindowSpec(
+      partitionSpec,
+      orderSpec,
+      Some(
+        toWindowFrame(
+          proto.Expression.Window.WindowFrame.FrameType.FRAME_TYPE_RANGE,
+          start,
+          end,
+          false)))
+  }
+
+  /**
+   * Converts this [[WindowSpec]] into a [[Column]] with an aggregate expression.
+   */
+  private[sql] def withAggregate(aggregate: Column): Column = {
+    Column { builder =>
+      val windowBuilder = builder.getWindowBuilder
+      windowBuilder.setWindowFunction(aggregate.expr)
+      if (frame.isDefined) {
+        windowBuilder.setFrameSpec(frame.get)
+      }
+      windowBuilder.addAllPartitionSpec(partitionSpec.asJava)
+      windowBuilder.addAllOrderSpec(orderSpec.asJava)
+    }
+  }
+
+  private[sql] def toWindowFrame(
+      frameType: proto.Expression.Window.WindowFrame.FrameType,
+      start: Long,
+      end: Long,
+      isRowBetween: Boolean): proto.Expression.Window.WindowFrame = {
+    val windowFrameBuilder = proto.Expression.Window.WindowFrame.newBuilder()
+    windowFrameBuilder.setFrameType(frameType)
+    start match {
+      case 0 => windowFrameBuilder.getLowerBuilder.setCurrentRow(true)
+      case Long.MinValue => windowFrameBuilder.getLowerBuilder.setUnbounded(true)
+      case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue =>
+        windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder
+          .setInteger(start.toInt)
+      case _ if !isRowBetween =>
+        windowFrameBuilder.getLowerBuilder.getValueBuilder.getLiteralBuilder.setLong(start)
+      case _ => throw new UnsupportedOperationException()
+    }
+
+    end match {
+      case 0 => windowFrameBuilder.getUpperBuilder.setCurrentRow(true)
+      case Long.MaxValue => windowFrameBuilder.getUpperBuilder.setUnbounded(true)
+      case x if isRowBetween && Int.MinValue <= x && x <= Int.MaxValue =>
+        windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder
+          .setInteger(end.toInt)
+      case _ if !isRowBetween =>
+        windowFrameBuilder.getUpperBuilder.getValueBuilder.getLiteralBuilder.setLong(end)
+      case _ => throw new UnsupportedOperationException()
+    }
+
+    windowFrameBuilder.build()
+  }
+}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b9ae66b2b9e..4cd7bfa0887 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{functions => fn}
 import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types._
 
@@ -1645,4 +1646,14 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
   }
 
   /* Window API */
+  test("window") {
+    simple.select(
+      fn.min("id").over(Window.partitionBy(Column("a"), Column("b"))),
+      fn.min("id").over(Window.partitionBy("a", "b")),
+      fn.min("id").over(Window.orderBy(Column("a"), Column("b"))),
+      fn.min("id").over(Window.orderBy("a", "b")),
+      fn.min("id").over(Window.orderBy("a").rowsBetween(2L, 3L)),
+      fn.min("id").over(Window.orderBy("a").rangeBetween(2L, 3L)),
+      fn.count(Column("id")).over())
+  }
 }
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain
new file mode 100644
index 00000000000..6f816957925
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/window.explain
@@ -0,0 +1,8 @@
+Project [min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING [...]
++- Project [id#0L, a#0, b#0, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS [...]
+   +- Window [count(id#0L) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count(id) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L]
+      +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 2, 3)) AS min(id) OVER (ORDER BY a ASC NULLS FIRST ROWS BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, cast(2 as int), cast(3 as int))) AS min(id) OVER (ORDER BY a ASC NULLS FIRST RANGE BETWEEN 2 FOLLOWING AND 3 FOLLOWING)#0L], [a#0 ASC NULLS FIRST]
+         +- Window [min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#0L, min(id#0L) windowspecdefinition(a#0 ASC NULLS FIRST, b#0 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS min(id) OVER (ORDER BY a ASC NULLS FIRST, b ASC NULLS FIRST [...]
+            +- Window [min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L, min(id#0L) windowspecdefinition(a#0, b#0, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS min(id) OVER (PARTITION BY a, b ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)#0L], [a#0, b#0]
+               +- Project [id#0L, a#0, b#0]
+                  +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.json b/connector/connect/common/src/test/resources/query-tests/queries/window.json
new file mode 100644
index 00000000000..8649a9d6e54
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/window.json
@@ -0,0 +1,205 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "partitionSpec": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "a"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }, {
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "b"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "a"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }, {
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "b"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }]
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "a"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }],
+        "frameSpec": {
+          "frameType": "FRAME_TYPE_ROW",
+          "lower": {
+            "value": {
+              "literal": {
+                "integer": 2
+              }
+            }
+          },
+          "upper": {
+            "value": {
+              "literal": {
+                "integer": 3
+              }
+            }
+          }
+        }
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "min",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        },
+        "orderSpec": [{
+          "child": {
+            "unresolvedAttribute": {
+              "unparsedIdentifier": "a"
+            }
+          },
+          "direction": "SORT_DIRECTION_ASCENDING",
+          "nullOrdering": "SORT_NULLS_FIRST"
+        }],
+        "frameSpec": {
+          "frameType": "FRAME_TYPE_RANGE",
+          "lower": {
+            "value": {
+              "literal": {
+                "long": "2"
+              }
+            }
+          },
+          "upper": {
+            "value": {
+              "literal": {
+                "long": "3"
+              }
+            }
+          }
+        }
+      }
+    }, {
+      "window": {
+        "windowFunction": {
+          "unresolvedFunction": {
+            "functionName": "count",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "id"
+              }
+            }]
+          }
+        }
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin
new file mode 100644
index 00000000000..caa9d66934c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/window.proto.bin
@@ -0,0 +1,43 @@
+�
+$Z" struct<id:bigint,a:int,b:double>!Z
+
+min
+id
+a
+b!Z
+
+min
+id
+a
+b-Z+
+
+min
+id
+
+a
+
+b-Z+
+
+min
+id
+
+a
+
+b4Z2
+
+min
+id
+
+a"
+0
+04Z2
+
+min
+id
+
+a"
+8
+8Z
+
+count
+id
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org