You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/15 01:32:55 UTC

[spark] branch master updated: [SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 79100ce225d [SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API
79100ce225d is described below

commit 79100ce225d8fafe58f8f61329e4f0ef51c2acf9
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 15 09:32:37 2023 +0800

    [SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API
    
    ### What changes were proposed in this pull request?
    This PR want add `any_value`, `approx_percentile`, `count_if`, `first_value`, `histogram_numeric`, `last_value`, `reduce` to Scala, Python and Connect API.
    
    ### Why are the changes needed?
    Add `any_value`, `approx_percentile`, `count_if`, `first_value`, `histogram_numeric`, `last_value`, `reduce` to Scala, Python and Connect API
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #41588 from beliefer/SPARK-43941.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     | 167 +++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  54 ++-
 ...> function_any_value_with_ignore_nulls.explain} |   2 +-
 ... function_any_value_with_respect_nulls.explain} |   2 +-
 .../function_approx_percentile.explain             |   2 +
 .../explain-results/function_count_if.explain      |   2 +
 ...function_first_value_with_ignore_nulls.explain} |   2 +-
 ...unction_first_value_with_respect_nulls.explain} |   2 +-
 ...in => function_first_with_ignore_nulls.explain} |   0
 ...n => function_first_with_respect_nulls.explain} |   2 +-
 .../function_histogram_numeric.explain             |   2 +
 ... function_last_value_with_ignore_nulls.explain} |   2 +-
 ...function_last_value_with_respect_nulls.explain} |   2 +-
 ...ain => function_last_with_ignore_nulls.explain} |   2 +-
 ...in => function_last_with_respect_nulls.explain} |   0
 .../explain-results/function_reduce.explain        |   2 +
 ...n => function_any_value_with_ignore_nulls.json} |   2 +-
 ...function_any_value_with_ignore_nulls.proto.bin} | Bin 180 -> 184 bytes
 ... => function_any_value_with_respect_nulls.json} |   6 +-
 ...unction_any_value_with_respect_nulls.proto.bin} | Bin 179 -> 178 bytes
 ..._first.json => function_approx_percentile.json} |   8 +-
 ...to.bin => function_approx_percentile.proto.bin} | Bin 180 -> 205 bytes
 ...{function_first.json => function_count_if.json} |  19 +-
 ...first.proto.bin => function_count_if.proto.bin} | Bin 180 -> 190 bytes
 ...=> function_first_value_with_ignore_nulls.json} |   2 +-
 ...nction_first_value_with_ignore_nulls.proto.bin} | Bin 180 -> 186 bytes
 ...> function_first_value_with_respect_nulls.json} |   6 +-
 ...ction_first_value_with_respect_nulls.proto.bin} | Bin 180 -> 180 bytes
 ....json => function_first_with_ignore_nulls.json} |   0
 ... => function_first_with_ignore_nulls.proto.bin} | Bin
 ...json => function_first_with_respect_nulls.json} |   2 +-
 ...=> function_first_with_respect_nulls.proto.bin} | Bin 180 -> 180 bytes
 ..._first.json => function_histogram_numeric.json} |   4 +-
 ...to.bin => function_histogram_numeric.proto.bin} | Bin 180 -> 192 bytes
 ... => function_last_value_with_ignore_nulls.json} |   2 +-
 ...unction_last_value_with_ignore_nulls.proto.bin} | Bin 180 -> 185 bytes
 ...=> function_last_value_with_respect_nulls.json} |   6 +-
 ...nction_last_value_with_respect_nulls.proto.bin} | Bin 179 -> 179 bytes
 ...t.json => function_last_with_ignore_nulls.json} |   2 +-
 ...n => function_last_with_ignore_nulls.proto.bin} | Bin 179 -> 179 bytes
 ....json => function_last_with_respect_nulls.json} |   0
 ... => function_last_with_respect_nulls.proto.bin} | Bin
 .../query-tests/queries/function_reduce.json       |  62 ++++
 ...n_first.proto.bin => function_reduce.proto.bin} | Bin 180 -> 232 bytes
 .../source/reference/pyspark.sql/functions.rst     |   7 +
 python/pyspark/sql/connect/functions.py            |  83 +++++
 python/pyspark/sql/functions.py                    | 396 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     | 161 ++++++++-
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  61 ++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |  30 +-
 50 files changed, 1059 insertions(+), 45 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 00b92e7bb79..8c1ee094c7a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -520,6 +520,34 @@ object functions {
    */
   def first(columnName: String): Column = first(Column(columnName))
 
+  /**
+   * Aggregate function: returns the first value in a group.
+   *
+   * @note
+   *   The function is non-deterministic because its results depends on the order of the rows
+   *   which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def first_value(e: Column): Column = Column.fn("first_value", e)
+
+  /**
+   * Aggregate function: returns the first value in a group.
+   *
+   * The function by default returns the first values it sees. It will return the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+   *
+   * @note
+   *   The function is non-deterministic because its results depends on the order of the rows
+   *   which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def first_value(e: Column, ignoreNulls: Column): Column =
+    Column.fn("first_value", e, ignoreNulls)
+
   /**
    * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated or
    * not, returns 1 for aggregated or 0 for not aggregated in the result set.
@@ -732,6 +760,34 @@ object functions {
    */
   def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
 
+  /**
+   * Aggregate function: returns the last value in a group.
+   *
+   * @note
+   *   The function is non-deterministic because its results depends on the order of the rows
+   *   which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def last_value(e: Column): Column = Column.fn("last_value", e)
+
+  /**
+   * Aggregate function: returns the last value in a group.
+   *
+   * The function by default returns the last values it sees. It will return the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+   *
+   * @note
+   *   The function is non-deterministic because its results depends on the order of the rows
+   *   which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def last_value(e: Column, ignoreNulls: Column): Column =
+    Column.fn("last_value", e, ignoreNulls)
+
   /**
    * Aggregate function: returns the most frequent value in a group.
    *
@@ -849,6 +905,25 @@ object functions {
   def percentile_approx(e: Column, percentage: Column, accuracy: Column): Column =
     Column.fn("percentile_approx", e, percentage, accuracy)
 
+  /**
+   * Aggregate function: returns the approximate `percentile` of the numeric column `col` which is
+   * the smallest value in the ordered `col` values (sorted from least to greatest) such that no
+   * more than `percentage` of `col` values is less than the value or equal to that value.
+   *
+   * If percentage is an array, each value must be between 0.0 and 1.0. If it is a single floating
+   * point value, it must be between 0.0 and 1.0.
+   *
+   * The accuracy parameter is a positive numeric literal which controls approximation accuracy at
+   * the cost of memory. Higher value of accuracy yields better accuracy, 1.0/accuracy is the
+   * relative error of the approximation.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def approx_percentile(e: Column, percentage: Column, accuracy: Column): Column = {
+    Column.fn("approx_percentile", e, percentage, accuracy)
+  }
+
   /**
    * Aggregate function: returns the product of all numerical elements in a group.
    *
@@ -1101,6 +1176,49 @@ object functions {
    */
   def regr_syy(y: Column, x: Column): Column = Column.fn("regr_syy", y, x)
 
+  /**
+   * Aggregate function: returns some value of `e` for a group of rows.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def any_value(e: Column): Column = Column.fn("any_value", e)
+
+  /**
+   * Aggregate function: returns some value of `e` for a group of rows. If `isIgnoreNull` is true,
+   * returns only non-null values.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def any_value(e: Column, ignoreNulls: Column): Column =
+    Column.fn("any_value", e, ignoreNulls)
+
+  /**
+   * Aggregate function: returns the number of `TRUE` values for the expression.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def count_if(e: Column): Column = Column.fn("count_if", e)
+
+  /**
+   * Aggregate function: computes a histogram on numeric 'expr' using nb bins. The return value is
+   * an array of (x,y) pairs representing the centers of the histogram's bins. As the value of
+   * 'nb' is increased, the histogram approximation gets finer-grained, but may yield artifacts
+   * around outliers. In practice, 20-40 histogram bins appear to work well, with more bins being
+   * required for skewed or smaller datasets. Note that this function creates a histogram with
+   * non-uniform bin widths. It offers no guarantees in terms of the mean-squared-error of the
+   * histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+   * statistical computing packages. Note: the output type of the 'x' field in the return value is
+   * propagated from the input value consumed in the aggregate function.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def histogram_numeric(e: Column, nBins: Column): Column =
+    Column.fn("histogram_numeric", e, nBins)
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions
   //////////////////////////////////////////////////////////////////////////////////////////////
@@ -5069,6 +5187,55 @@ object functions {
   def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
     aggregate(expr, initialValue, merge, c => c)
 
+  /**
+   * Applies a binary operator to an initial state and all elements in the array, and reduces this
+   * to a single state. The final state is converted into the final result by applying a finish
+   * function.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10))
+   * }}}
+   *
+   * @param expr
+   *   the input array column
+   * @param initialValue
+   *   the initial value
+   * @param merge
+   *   (combined_value, input_value) => combined_value, the merge function to merge an input value
+   *   to the combined_value
+   * @param finish
+   *   combined_value => final_value, the lambda function to convert the combined value of all
+   *   inputs to final result
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def reduce(
+      expr: Column,
+      initialValue: Column,
+      merge: (Column, Column) => Column,
+      finish: Column => Column): Column =
+    Column.fn("reduce", expr, initialValue, createLambda(merge), createLambda(finish))
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the array, and reduces this
+   * to a single state.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x))
+   * }}}
+   *
+   * @param expr
+   *   the input array column
+   * @param initialValue
+   *   the initial value
+   * @param merge
+   *   (combined_value, input_value) => combined_value, the merge function to merge an input value
+   *   to the combined_value
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def reduce(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
+    reduce(expr, initialValue, merge, c => c)
+
   /**
    * Merge two given arrays, element-wise, into a single array using a function. If one array is
    * shorter, nulls are appended at the end to match the length of the longer array, before
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b2b70acdf60..90e0d74e195 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -950,16 +950,56 @@ class PlanGenerationTestSuite
     fn.covar_samp("a", "b")
   }
 
-  functionTest("first") {
+  functionTest("first with ignore nulls") {
     fn.first("a", ignoreNulls = true)
   }
 
+  functionTest("first with respect nulls") {
+    fn.first("a")
+  }
+
+  functionTest("first_value with ignore nulls") {
+    fn.first_value(fn.col("a"), ignoreNulls = lit(true))
+  }
+
+  functionTest("first_value with respect nulls") {
+    fn.first_value(fn.col("a"))
+  }
+
+  functionTest("any_value with ignore nulls") {
+    fn.any_value(fn.col("a"), ignoreNulls = lit(true))
+  }
+
+  functionTest("any_value with respect nulls") {
+    fn.any_value(fn.col("a"))
+  }
+
   functionTest("kurtosis") {
     fn.kurtosis("a")
   }
 
-  functionTest("last") {
-    fn.last("a", ignoreNulls = false)
+  functionTest("last with ignore nulls") {
+    fn.last("a", ignoreNulls = true)
+  }
+
+  functionTest("last with respect nulls") {
+    fn.last("a")
+  }
+
+  functionTest("last_value with ignore nulls") {
+    fn.last_value(fn.col("a"), ignoreNulls = lit(true))
+  }
+
+  functionTest("last_value with respect nulls") {
+    fn.last_value(fn.col("a"))
+  }
+
+  functionTest("count_if") {
+    fn.count_if(fn.col("a").gt(0))
+  }
+
+  functionTest("histogram_numeric") {
+    fn.histogram_numeric(fn.col("a"), lit(10))
   }
 
   functionTest("mode") {
@@ -998,6 +1038,10 @@ class PlanGenerationTestSuite
     fn.percentile_approx(fn.col("a"), fn.lit(0.3), fn.lit(20))
   }
 
+  functionTest("approx_percentile") {
+    fn.approx_percentile(fn.col("a"), fn.lit(0.3), fn.lit(20))
+  }
+
   functionTest("product") {
     fn.product(fn.col("a"))
   }
@@ -2127,6 +2171,10 @@ class PlanGenerationTestSuite
     fn.aggregate(fn.col("e"), lit(0), (x, y) => x + y)
   }
 
+  functionTest("reduce") {
+    fn.reduce(fn.col("e"), lit(0), (x, y) => x + y)
+  }
+
   functionTest("zip_with") {
     fn.zip_with(fn.col("e"), fn.col("e"), (x, y) => x + y)
   }
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
similarity index 57%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
index 0675353b706..15b0279de12 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, true) AS any_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
similarity index 57%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
index 0675353b706..137d5a1b052 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, false) AS any_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain
new file mode 100644
index 00000000000..669e771a5dc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain
@@ -0,0 +1,2 @@
+Aggregate [approx_percentile(a#0, 0.3, 20, 0, 0) AS approx_percentile(a, 0.3, 20)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain
new file mode 100644
index 00000000000..1c23bbf6bce
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain
@@ -0,0 +1,2 @@
+Aggregate [count(if (((a#0 > 0) = false)) null else (a#0 > 0)) AS count_if((a > 0))#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
similarity index 53%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
index 0675353b706..c4ca5da1650 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first_value(a#0, true) AS first_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
similarity index 53%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
index 0675353b706..9d605bd61bc 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first_value(a#0, false) AS first_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_ignore_nulls.explain
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_ignore_nulls.explain
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
similarity index 59%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
index 0675353b706..81d1a3b1204 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, false) AS first(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain
new file mode 100644
index 00000000000..273e486db72
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain
@@ -0,0 +1,2 @@
+Aggregate [histogram_numeric(a#0, 10, 0, 0) AS histogram_numeric(a, 10)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
similarity index 54%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
index 0675353b706..8c3cf54aedf 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [last_value(a#0, true) AS last_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
similarity index 54%
rename from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
rename to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
index 0675353b706..705bd9954d5 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [last_value(a#0, false) AS last_value(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
similarity index 60%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
index a7ae558d5c9..207c073446b 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [last(a#0, false) AS last(a)#0]
+Aggregate [last(a#0, true) AS last(a)#0]
 +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_respect_nulls.explain
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
rename to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_respect_nulls.explain
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain
new file mode 100644
index 00000000000..d25e5a306e2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain
@@ -0,0 +1,2 @@
+Project [reduce(e#0, 0, lambdafunction((lambda x#0 + lambda y#0), lambda x#0, lambda y#0, false), lambdafunction(lambda x#0, lambda x#0, false)) AS reduce(e, 0, lambdafunction((namedlambdavariable() + namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
index dc33bad3c50..011d43a91d0 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "any_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin
similarity index 74%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin
index cb029dfd26b..546c696ecfd 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
similarity index 83%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
index dc33bad3c50..7d4f5a2de38 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "any_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
           }
-        }, {
-          "literal": {
-            "boolean": true
-          }
         }]
       }
     }]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin
similarity index 73%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin
index 69221737be6..124a7ad7efe 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
similarity index 80%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
index dc33bad3c50..490a2dcd869 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
@@ -13,14 +13,18 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "approx_percentile",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
           }
         }, {
           "literal": {
-            "boolean": true
+            "double": 0.3
+          }
+        }, {
+          "literal": {
+            "integer": 20
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin
similarity index 64%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin
index cb029dfd26b..ae73716fa43 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
similarity index 58%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
index dc33bad3c50..669477bbc5d 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
@@ -13,14 +13,19 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "count_if",
         "arguments": [{
-          "unresolvedAttribute": {
-            "unparsedIdentifier": "a"
-          }
-        }, {
-          "literal": {
-            "boolean": true
+          "unresolvedFunction": {
+            "functionName": "\u003e",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "a"
+              }
+            }, {
+              "literal": {
+                "integer": 0
+              }
+            }]
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin
similarity index 72%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin
index cb029dfd26b..07c65ebaa42 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
index dc33bad3c50..af55fe44ae8 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "first_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin
similarity index 73%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin
index cb029dfd26b..7121c820aa7 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
similarity index 83%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
index dc33bad3c50..8276e35893f 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "first_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
           }
-        }, {
-          "literal": {
-            "boolean": true
-          }
         }]
       }
     }]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin
index cb029dfd26b..b843d521115 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.json
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.json
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.proto.bin
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.proto.bin
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
similarity index 95%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
index dc33bad3c50..0e315cc6b1b 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
@@ -20,7 +20,7 @@
           }
         }, {
           "literal": {
-            "boolean": true
+            "boolean": false
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin
similarity index 98%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin
index cb029dfd26b..bf1d48903df 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
index dc33bad3c50..548b4977ddc 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
@@ -13,14 +13,14 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "histogram_numeric",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
           }
         }, {
           "literal": {
-            "boolean": true
+            "integer": 10
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin
similarity index 71%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin
index cb029dfd26b..81dbcd476ec 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
index dc33bad3c50..e78a456082c 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "last_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin
similarity index 74%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin
index cb029dfd26b..c04f8385995 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
similarity index 83%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
index dc33bad3c50..cb509997e65 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
     },
     "expressions": [{
       "unresolvedFunction": {
-        "functionName": "first",
+        "functionName": "last_value",
         "arguments": [{
           "unresolvedAttribute": {
             "unparsedIdentifier": "a"
           }
-        }, {
-          "literal": {
-            "boolean": true
-          }
         }]
       }
     }]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin
index 69221737be6..cee9838b704 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
similarity index 95%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
index f26e5887ed5..6d1be02c785 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
@@ -20,7 +20,7 @@
           }
         }, {
           "literal": {
-            "boolean": false
+            "boolean": true
           }
         }]
       }
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin
similarity index 98%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin
index 69221737be6..f6590582c6f 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.json
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_last.json
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.json
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.proto.bin
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.proto.bin
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json
new file mode 100644
index 00000000000..b35437bd980
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json
@@ -0,0 +1,62 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "reduce",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 0
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedNamedLambdaVariable": {
+                "nameParts": ["x"]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin
similarity index 56%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
rename to connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin
index cb029dfd26b..0cc2cd204d8 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index 1ccf602b06d..697edfa45ad 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -253,8 +253,10 @@ Aggregate Functions
 .. autosummary::
     :toctree: api/
 
+    any_value
     approxCountDistinct
     approx_count_distinct
+    approx_percentile
     avg
     collect_list
     collect_set
@@ -262,15 +264,19 @@ Aggregate Functions
     count
     count_distinct
     countDistinct
+    count_if
     covar_pop
     covar_samp
     first
+    first_value
     grouping
     grouping_id
+    histogram_numeric
     hll_sketch_agg
     hll_union_agg
     kurtosis
     last
+    last_value
     max
     max_by
     mean
@@ -281,6 +287,7 @@ Aggregate Functions
     percentile
     percentile_approx
     product
+    reduce
     regr_avgx
     regr_avgy
     regr_count
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index e19247161f7..eeaf25bdc7d 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -1078,6 +1078,26 @@ def percentile_approx(
 percentile_approx.__doc__ = pysparkfuncs.percentile_approx.__doc__
 
 
+def approx_percentile(
+    col: "ColumnOrName",
+    percentage: Union[Column, float, List[float], Tuple[float]],
+    accuracy: Union[Column, float] = 10000,
+) -> Column:
+    if isinstance(percentage, Column):
+        percentage_col = percentage
+    elif isinstance(percentage, (list, tuple)):
+        # Convert tuple to list
+        percentage_col = lit(list(percentage))
+    else:
+        # Probably scalar
+        percentage_col = lit(percentage)
+
+    return _invoke_function("approx_percentile", _to_col(col), percentage_col, lit(accuracy))
+
+
+approx_percentile.__doc__ = pysparkfuncs.approx_percentile.__doc__
+
+
 def product(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("product", col)
 
@@ -1269,6 +1289,53 @@ def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = No
 nth_value.__doc__ = pysparkfuncs.nth_value.__doc__
 
 
+def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("any_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("any_value", col, ignoreNulls)
+
+
+any_value.__doc__ = pysparkfuncs.any_value.__doc__
+
+
+def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("first_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("first_value", col, ignoreNulls)
+
+
+first_value.__doc__ = pysparkfuncs.first_value.__doc__
+
+
+def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("last_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("last_value", col, ignoreNulls)
+
+
+last_value.__doc__ = pysparkfuncs.last_value.__doc__
+
+
+def count_if(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("count_if", col)
+
+
+count_if.__doc__ = pysparkfuncs.count_if.__doc__
+
+
+def histogram_numeric(col: "ColumnOrName", nBins: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("histogram_numeric", col, nBins)
+
+
+histogram_numeric.__doc__ = pysparkfuncs.histogram_numeric.__doc__
+
+
 def ntile(n: int) -> Column:
     return _invoke_function("ntile", lit(n))
 
@@ -1313,6 +1380,22 @@ def aggregate(
 aggregate.__doc__ = pysparkfuncs.aggregate.__doc__
 
 
+def reduce(
+    col: "ColumnOrName",
+    initialValue: "ColumnOrName",
+    merge: Callable[[Column, Column], Column],
+    finish: Optional[Callable[[Column], Column]] = None,
+) -> Column:
+    if finish is not None:
+        return _invoke_higher_order_function("reduce", [col, initialValue], [merge, finish])
+
+    else:
+        return _invoke_higher_order_function("reduce", [col, initialValue], [merge])
+
+
+reduce.__doc__ = pysparkfuncs.reduce.__doc__
+
+
 def array(*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]) -> Column:
     if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
         cols = cols[0]  # type: ignore[assignment]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index c4d26c8aaf5..b5c3ec9cec6 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3852,6 +3852,83 @@ def percentile_approx(
     return _invoke_function("percentile_approx", _to_java_column(col), percentage, accuracy)
 
 
+@try_remote_functions
+def approx_percentile(
+    col: "ColumnOrName",
+    percentage: Union[Column, float, List[float], Tuple[float]],
+    accuracy: Union[Column, float] = 10000,
+) -> Column:
+    """Returns the approximate `percentile` of the numeric column `col` which is the smallest value
+    in the ordered `col` values (sorted from least to greatest) such that no more than `percentage`
+    of `col` values is less than the value or equal to that value.
+
+
+    .. versionadded:: 3.1.0
+
+    .. versionchanged:: 3.4.0
+        Supports Spark Connect.
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        input column.
+    percentage : :class:`~pyspark.sql.Column`, float, list of floats or tuple of floats
+        percentage in decimal (must be between 0.0 and 1.0).
+        When percentage is an array, each value of the percentage array must be between 0.0 and 1.0.
+        In this case, returns the approximate percentile array of column col
+        at the given percentage array.
+    accuracy : :class:`~pyspark.sql.Column` or float
+        is a positive numeric literal which controls approximation accuracy
+        at the cost of memory. Higher value of accuracy yields better accuracy,
+        1.0/accuracy is the relative error of the approximation. (default: 10000).
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        approximate `percentile` of the numeric column.
+
+    Examples
+    --------
+    >>> key = (col("id") % 3).alias("key")
+    >>> value = (randn(42) + key * 10).alias("value")
+    >>> df = spark.range(0, 1000, 1, 1).select(key, value)
+    >>> df.select(
+    ...     approx_percentile("value", [0.25, 0.5, 0.75], 1000000).alias("quantiles")
+    ... ).printSchema()
+    root
+     |-- quantiles: array (nullable = true)
+     |    |-- element: double (containsNull = false)
+
+    >>> df.groupBy("key").agg(
+    ...     approx_percentile("value", 0.5, lit(1000000)).alias("median")
+    ... ).printSchema()
+    root
+     |-- key: long (nullable = true)
+     |-- median: double (nullable = true)
+    """
+    sc = get_active_spark_context()
+
+    if isinstance(percentage, (list, tuple)):
+        # A local list
+        percentage = _invoke_function(
+            "array", _to_seq(sc, [_create_column_from_literal(x) for x in percentage])
+        )._jc
+    elif isinstance(percentage, Column):
+        # Already a Column
+        percentage = _to_java_column(percentage)
+    else:
+        # Probably scalar
+        percentage = _create_column_from_literal(percentage)
+
+    accuracy = (
+        _to_java_column(accuracy)
+        if isinstance(accuracy, Column)
+        else _create_column_from_literal(accuracy)
+    )
+
+    return _invoke_function("approx_percentile", _to_java_column(col), percentage, accuracy)
+
+
 @try_remote_functions
 def rand(seed: Optional[int] = None) -> Column:
     """Generates a random column with independent and identically distributed (i.i.d.) samples
@@ -4785,6 +4862,253 @@ def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = Fa
     return _invoke_function("nth_value", _to_java_column(col), offset, ignoreNulls)
 
 
+@try_remote_functions
+def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    """Returns some value of `col` for a group of rows.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+    ignorenulls : :class:`~pyspark.sql.Column` or bool
+        if first value is null then look for first non-null value.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        some value of `col` for a group of rows.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(None, 1),
+    ...                             ("a", 2),
+    ...                             ("a", 3),
+    ...                             ("b", 8),
+    ...                             ("b", 2)], ["c1", "c2"])
+    >>> df.show()
+    +----+---+
+    |  c1| c2|
+    +----+---+
+    |NULL|  1|
+    |   a|  2|
+    |   a|  3|
+    |   b|  8|
+    |   b|  2|
+    +----+---+
+    <BLANKLINE>
+    >>> df.select(any_value('c1'), any_value('c2')).collect()
+    [Row(any_value(c1)=None, any_value(c2)=1)]
+    >>> df.select(any_value('c1', True), any_value('c2', True)).collect()
+    [Row(any_value(c1)='a', any_value(c2)=1)]
+    """
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("any_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("any_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    """Returns the first value of `col` for a group of rows. It will return the first non-null
+    value it sees when `ignoreNulls` is set to true. If all values are null, then null is returned.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+    ignorenulls : :class:`~pyspark.sql.Column` or bool
+        if first value is null then look for first non-null value.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        some value of `col` for a group of rows.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(None, 1),
+    ...                             ("a", 2),
+    ...                             ("a", 3),
+    ...                             ("b", 8),
+    ...                             ("b", 2)], ["c1", "c2"])
+    >>> df.show()
+    +----+---+
+    |  c1| c2|
+    +----+---+
+    |NULL|  1|
+    |   a|  2|
+    |   a|  3|
+    |   b|  8|
+    |   b|  2|
+    +----+---+
+    <BLANKLINE>
+    >>> df.select(first_value('c1').alias('a'), first_value('c2').alias('b')).collect()
+    [Row(a=None, b=1)]
+    >>> df.select(first_value('c1', True).alias('a'), first_value('c2', True).alias('b')).collect()
+    [Row(a='a', b=1)]
+    """
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("first_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("first_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+    """Returns the last value of `col` for a group of rows. It will return the last non-null
+    value it sees when `ignoreNulls` is set to true. If all values are null, then null is returned.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+    ignorenulls : :class:`~pyspark.sql.Column` or bool
+        if first value is null then look for first non-null value.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        some value of `col` for a group of rows.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("a", 1),
+    ...                             ("a", 2),
+    ...                             ("a", 3),
+    ...                             ("b", 8),
+    ...                             (None, 2)], ["c1", "c2"])
+    >>> df.show()
+    +----+---+
+    |  c1| c2|
+    +----+---+
+    |   a|  1|
+    |   a|  2|
+    |   a|  3|
+    |   b|  8|
+    |NULL|  2|
+    +----+---+
+    <BLANKLINE>
+    >>> df.select(last_value('c1').alias('a'), last_value('c2').alias('b')).collect()
+    [Row(a=None, b=2)]
+    >>> df.select(last_value('c1', True).alias('a'), last_value('c2', True).alias('b')).collect()
+    [Row(a='b', b=2)]
+    """
+    if ignoreNulls is None:
+        return _invoke_function_over_columns("last_value", col)
+    else:
+        ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+        return _invoke_function_over_columns("last_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def count_if(col: "ColumnOrName") -> Column:
+    """Returns the number of `TRUE` values for the `col`.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the number of `TRUE` values for the `col`.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("a", 1),
+    ...                             ("a", 2),
+    ...                             ("a", 3),
+    ...                             ("b", 8),
+    ...                             ("b", 2)], ["c1", "c2"])
+    >>> df.show()
+    +---+---+
+    | c1| c2|
+    +---+---+
+    |  a|  1|
+    |  a|  2|
+    |  a|  3|
+    |  b|  8|
+    |  b|  2|
+    +---+---+
+    <BLANKLINE>
+    >>> df.select(count_if(col('c2') % 2 == 0)).show()
+    +------------------------+
+    |count_if(((c2 % 2) = 0))|
+    +------------------------+
+    |                       3|
+    +------------------------+
+    <BLANKLINE>
+    """
+    return _invoke_function_over_columns("count_if", col)
+
+
+@try_remote_functions
+def histogram_numeric(col: "ColumnOrName", nBins: "ColumnOrName") -> Column:
+    """Computes a histogram on numeric 'col' using nb bins.
+    The return value is an array of (x,y) pairs representing the centers of the
+    histogram's bins. As the value of 'nb' is increased, the histogram approximation
+    gets finer-grained, but may yield artifacts around outliers. In practice, 20-40
+    histogram bins appear to work well, with more bins being required for skewed or
+    smaller datasets. Note that this function creates a histogram with non-uniform
+    bin widths. It offers no guarantees in terms of the mean-squared-error of the
+    histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+    statistical computing packages. Note: the output type of the 'x' field in the return value is
+    propagated from the input value consumed in the aggregate function.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to work on.
+    nBins : :class:`~pyspark.sql.Column` or str
+        number of Histogram columns.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        a histogram on numeric 'col' using nb bins.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("a", 1),
+    ...                             ("a", 2),
+    ...                             ("a", 3),
+    ...                             ("b", 8),
+    ...                             ("b", 2)], ["c1", "c2"])
+    >>> df.show()
+    +---+---+
+    | c1| c2|
+    +---+---+
+    |  a|  1|
+    |  a|  2|
+    |  a|  3|
+    |  b|  8|
+    |  b|  2|
+    +---+---+
+    <BLANKLINE>
+    >>> df.select(histogram_numeric('c2', lit(5))).show()
+    +------------------------+
+    |histogram_numeric(c2, 5)|
+    +------------------------+
+    |    [{1, 1.0}, {2, 1....|
+    +------------------------+
+    <BLANKLINE>
+    """
+    return _invoke_function_over_columns("histogram_numeric", col, nBins)
+
+
 @try_remote_functions
 def ntile(n: int) -> Column:
     """
@@ -11024,6 +11348,78 @@ def aggregate(
         return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge])
 
 
+@try_remote_functions
+def reduce(
+    col: "ColumnOrName",
+    initialValue: "ColumnOrName",
+    merge: Callable[[Column, Column], Column],
+    finish: Optional[Callable[[Column], Column]] = None,
+) -> Column:
+    """
+    Applies a binary operator to an initial state and all elements in the array,
+    and reduces this to a single state. The final state is converted into the final result
+    by applying a finish function.
+
+    Both functions can use methods of :class:`~pyspark.sql.Column`, functions defined in
+    :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
+    Python ``UserDefinedFunctions`` are not supported
+    (`SPARK-27052 <https://issues.apache.org/jira/browse/SPARK-27052>`__).
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        name of column or expression
+    initialValue : :class:`~pyspark.sql.Column` or str
+        initial value. Name of column or expression
+    merge : function
+        a binary function ``(acc: Column, x: Column) -> Column...`` returning expression
+        of the same type as ``zero``
+    finish : function
+        an optional unary function ``(x: Column) -> Column: ...``
+        used to convert accumulated value.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        final value after aggregate function is applied.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
+    >>> df.select(reduce("values", lit(0.0), lambda acc, x: acc + x).alias("sum")).show()
+    +----+
+    | sum|
+    +----+
+    |42.0|
+    +----+
+
+    >>> def merge(acc, x):
+    ...     count = acc.count + 1
+    ...     sum = acc.sum + x
+    ...     return struct(count.alias("count"), sum.alias("sum"))
+    >>> df.select(
+    ...     reduce(
+    ...         "values",
+    ...         struct(lit(0).alias("count"), lit(0.0).alias("sum")),
+    ...         merge,
+    ...         lambda acc: acc.sum / acc.count,
+    ...     ).alias("mean")
+    ... ).show()
+    +----+
+    |mean|
+    +----+
+    | 8.4|
+    +----+
+    """
+    if finish is not None:
+        return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge, finish])
+
+    else:
+        return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge])
+
+
 @try_remote_functions
 def zip_with(
     left: "ColumnOrName",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 2f9b458a9d8..d797866664e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -549,6 +549,33 @@ object functions {
    */
   def first(columnName: String): Column = first(Column(columnName))
 
+  /**
+   * Aggregate function: returns the first value in a group.
+   *
+   * @note The function is non-deterministic because its results depends on the order of the rows
+   * which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def first_value(e: Column): Column = first(e)
+
+  /**
+   * Aggregate function: returns the first value in a group.
+   *
+   * The function by default returns the first values it sees. It will return the first non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+   *
+   * @note The function is non-deterministic because its results depends on the order of the rows
+   * which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def first_value(e: Column, ignoreNulls: Column): Column = withAggregateFunction {
+    new First(e.expr, ignoreNulls.expr)
+  }
+
   /**
    * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
    * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
@@ -724,7 +751,7 @@ object functions {
    * @since 2.0.0
    */
   def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
-    new Last(e.expr, ignoreNulls)
+    Last(e.expr, ignoreNulls)
   }
 
   /**
@@ -771,6 +798,33 @@ object functions {
    */
   def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
 
+  /**
+   * Aggregate function: returns the last value in a group.
+   *
+   * @note The function is non-deterministic because its results depends on the order of the rows
+   * which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def last_value(e: Column): Column = last(e)
+
+  /**
+   * Aggregate function: returns the last value in a group.
+   *
+   * The function by default returns the last values it sees. It will return the last non-null
+   * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+   *
+   * @note The function is non-deterministic because its results depends on the order of the rows
+   * which may be non-deterministic after a shuffle.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def last_value(e: Column, ignoreNulls: Column): Column = withAggregateFunction {
+    new Last(e.expr, ignoreNulls.expr)
+  }
+
   /**
    * Aggregate function: returns the most frequent value in a group.
    *
@@ -906,6 +960,26 @@ object functions {
     }
   }
 
+  /**
+   * Aggregate function: returns the approximate `percentile` of the numeric column `col` which
+   * is the smallest value in the ordered `col` values (sorted from least to greatest) such that
+   * no more than `percentage` of `col` values is less than the value or equal to that value.
+   *
+   * If percentage is an array, each value must be between 0.0 and 1.0.
+   * If it is a single floating point value, it must be between 0.0 and 1.0.
+   *
+   * The accuracy parameter is a positive numeric literal
+   * which controls approximation accuracy at the cost of memory.
+   * Higher value of accuracy yields better accuracy, 1.0/accuracy
+   * is the relative error of the approximation.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def approx_percentile(e: Column, percentage: Column, accuracy: Column): Column = {
+    percentile_approx(e, percentage, accuracy)
+  }
+
   /**
    * Aggregate function: returns the product of all numerical elements in a group.
    *
@@ -1165,6 +1239,50 @@ object functions {
    */
   def regr_syy(y: Column, x: Column): Column = withAggregateFunction { RegrSYY(y.expr, x.expr) }
 
+  /**
+   * Aggregate function: returns some value of `e` for a group of rows.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def any_value(e: Column): Column = withAggregateFunction { new AnyValue(e.expr) }
+
+  /**
+   * Aggregate function: returns some value of `e` for a group of rows.
+   * If `isIgnoreNull` is true, returns only non-null values.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def any_value(e: Column, ignoreNulls: Column): Column =
+    withAggregateFunction { new AnyValue(e.expr, ignoreNulls.expr) }
+
+  /**
+   * Aggregate function: returns the number of `TRUE` values for the expression.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def count_if(e: Column): Column = withAggregateFunction { CountIf(e.expr) }
+
+  /**
+   * Aggregate function: computes a histogram on numeric 'expr' using nb bins.
+   * The return value is an array of (x,y) pairs representing the centers of the
+   * histogram's bins. As the value of 'nb' is increased, the histogram approximation
+   * gets finer-grained, but may yield artifacts around outliers. In practice, 20-40
+   * histogram bins appear to work well, with more bins being required for skewed or
+   * smaller datasets. Note that this function creates a histogram with non-uniform
+   * bin widths. It offers no guarantees in terms of the mean-squared-error of the
+   * histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+   * statistical computing packages. Note: the output type of the 'x' field in the return value is
+   * propagated from the input value consumed in the aggregate function.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def histogram_numeric(e: Column, nBins: Column): Column =
+    withAggregateFunction { new HistogramNumeric(e.expr, nBins.expr) }
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions
   //////////////////////////////////////////////////////////////////////////////////////////////
@@ -5030,6 +5148,47 @@ object functions {
   def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
     aggregate(expr, initialValue, merge, c => c)
 
+  /**
+   * Applies a binary operator to an initial state and all elements in the array,
+   * and reduces this to a single state. The final state is converted into the final result
+   * by applying a finish function.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10))
+   * }}}
+   *
+   * @param expr the input array column
+   * @param initialValue the initial value
+   * @param merge (combined_value, input_value) => combined_value, the merge function to merge
+   *              an input value to the combined_value
+   * @param finish combined_value => final_value, the lambda function to convert the combined value
+   *               of all inputs to final result
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def reduce(
+      expr: Column,
+      initialValue: Column,
+      merge: (Column, Column) => Column,
+      finish: Column => Column): Column = aggregate(expr, initialValue, merge, finish)
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the array,
+   * and reduces this to a single state.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x))
+   * }}}
+   *
+   * @param expr the input array column
+   * @param initialValue the initial value
+   * @param merge (combined_value, input_value) => combined_value, the merge function to merge
+   *              an input value to the combined_value
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def reduce(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
+    aggregate(expr, initialValue, merge, c => c)
+
   /**
    * Merge two given arrays, element-wise, into a single array using a function.
    * If one array is shorter, nulls are appended at the end to match the length of the longer
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 586c3217dab..42ad1176abf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1042,6 +1042,29 @@ class DataFrameAggregateSuite extends QueryTest
     )
   }
 
+  test("any_value") {
+    checkAnswer(
+      courseSales.groupBy("course").agg(
+        any_value(col("year")),
+        any_value(col("year"), lit(true))
+      ),
+      Row("Java", 2012, 2012) :: Row("dotNET", 2012, 2012) :: Nil
+    )
+  }
+
+  test("approx_percentile") {
+    checkAnswer(
+      courseSales.groupBy("course").agg(
+        approx_percentile(col("earnings"), lit(0.3), lit(10000)),
+        approx_percentile(col("earnings"), array(lit(0.3), lit(0.6)), lit(10000)),
+        approx_percentile(col("earnings"), lit(0.3), lit(1)),
+        approx_percentile(col("earnings"), array(lit(0.3), lit(0.6)), lit(1))
+      ),
+      Row("Java", 20000.0, Seq(20000.0, 30000.0), 20000.0, Seq(20000.0, 20000.0)) ::
+        Row("dotNET", 5000.0, Seq(5000.0, 10000.0), 5000.0, Seq(5000.0, 5000.0)) :: Nil
+    )
+  }
+
   test("count_if") {
     withTempView("tempView") {
       Seq(("a", None), ("a", Some(1)), ("a", Some(2)), ("a", Some(3)),
@@ -1093,6 +1116,44 @@ class DataFrameAggregateSuite extends QueryTest
           context = ExpectedContext(fragment = "COUNT_IF(x)", start = 7, stop = 17))
       }
     }
+
+    checkAnswer(
+      courseSales.groupBy("course").agg(
+        count_if((col("earnings").gt(10000))),
+        count_if(col("course").equalTo("Java").and(col("earnings").gt(10000)))
+      ),
+      Row("Java", 2, 2) :: Row("dotNET", 1, 0) :: Nil
+    )
+  }
+
+  test("first_value") {
+    checkAnswer(
+      nullStrings.orderBy(col("n").desc).agg(
+        first_value(col("s")),
+        first_value(col("s"), lit(true))
+      ),
+      Row(null, "ABC") :: Nil
+    )
+  }
+
+  test("last_value") {
+    checkAnswer(
+      nullStrings.agg(
+        last_value(col("s")),
+        last_value(col("s"), lit(true))
+      ),
+      Row(null, "ABC") :: Nil
+    )
+  }
+
+  test("histogram_numeric") {
+    checkAnswer(
+      courseSales.groupBy("course").agg(
+        histogram_numeric(col("earnings"), lit(5))
+      ),
+      Row("Java", Seq(Row(20000.0, 1.0), Row(30000.0, 1.0))) ::
+        Row("dotNET", Seq(Row(5000.0, 1.0), Row(10000.0, 1.0), Row(48000.0, 1.0))) :: Nil
+    )
   }
 
   Seq(true, false).foreach { value =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index d1946469529..01e447895ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -74,8 +74,8 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     )
 
     val excludedSqlFunctions = Set(
-      "random", "first_value", "last_value",
-      "approx_percentile", "array_agg", "char_length", "character_length",
+      "random",
+      "array_agg", "char_length", "character_length",
       "lcase", "position", "printf", "substr", "ucase", "day", "cardinality", "sha",
       "getbit",
       // aliases for existing functions
@@ -4193,12 +4193,24 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
           Row(31),
           Row(0),
           Row(null)))
+      checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x)),
+        Seq(
+          Row(25),
+          Row(31),
+          Row(0),
+          Row(null)))
       checkAnswer(df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10)),
         Seq(
           Row(250),
           Row(310),
           Row(0),
           Row(null)))
+      checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x, _ * 10)),
+        Seq(
+          Row(250),
+          Row(310),
+          Row(0),
+          Row(null)))
     }
 
     // Test with local relation, the Project will be evaluated without codegen
@@ -4239,6 +4251,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
           Row(null),
           Row(0),
           Row(null)))
+      checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x)),
+        Seq(
+          Row(25),
+          Row(null),
+          Row(0),
+          Row(null)))
       checkAnswer(
         df.select(
           aggregate(col("i"), lit(0), (acc, x) => acc + x, acc => coalesce(acc, lit(0)) * 10)),
@@ -4247,6 +4265,14 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
           Row(0),
           Row(0),
           Row(null)))
+      checkAnswer(
+        df.select(
+          reduce(col("i"), lit(0), (acc, x) => acc + x, acc => coalesce(acc, lit(0)) * 10)),
+        Seq(
+          Row(250),
+          Row(0),
+          Row(0),
+          Row(null)))
     }
 
     // Test with local relation, the Project will be evaluated without codegen


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org