You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/15 01:32:55 UTC
[spark] branch master updated: [SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 79100ce225d [SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API
79100ce225d is described below
commit 79100ce225d8fafe58f8f61329e4f0ef51c2acf9
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 15 09:32:37 2023 +0800
[SPARK-43941][SQL][PYTHON][CONNECT] Add any_value, approx_percentile, count_if, first_value, histogram_numeric, last_value, reduce to Scala, Python and Connect API
### What changes were proposed in this pull request?
This PR want add `any_value`, `approx_percentile`, `count_if`, `first_value`, `histogram_numeric`, `last_value`, `reduce` to Scala, Python and Connect API.
### Why are the changes needed?
Add `any_value`, `approx_percentile`, `count_if`, `first_value`, `histogram_numeric`, `last_value`, `reduce` to Scala, Python and Connect API
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New test cases.
Closes #41588 from beliefer/SPARK-43941.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../scala/org/apache/spark/sql/functions.scala | 167 +++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 54 ++-
...> function_any_value_with_ignore_nulls.explain} | 2 +-
... function_any_value_with_respect_nulls.explain} | 2 +-
.../function_approx_percentile.explain | 2 +
.../explain-results/function_count_if.explain | 2 +
...function_first_value_with_ignore_nulls.explain} | 2 +-
...unction_first_value_with_respect_nulls.explain} | 2 +-
...in => function_first_with_ignore_nulls.explain} | 0
...n => function_first_with_respect_nulls.explain} | 2 +-
.../function_histogram_numeric.explain | 2 +
... function_last_value_with_ignore_nulls.explain} | 2 +-
...function_last_value_with_respect_nulls.explain} | 2 +-
...ain => function_last_with_ignore_nulls.explain} | 2 +-
...in => function_last_with_respect_nulls.explain} | 0
.../explain-results/function_reduce.explain | 2 +
...n => function_any_value_with_ignore_nulls.json} | 2 +-
...function_any_value_with_ignore_nulls.proto.bin} | Bin 180 -> 184 bytes
... => function_any_value_with_respect_nulls.json} | 6 +-
...unction_any_value_with_respect_nulls.proto.bin} | Bin 179 -> 178 bytes
..._first.json => function_approx_percentile.json} | 8 +-
...to.bin => function_approx_percentile.proto.bin} | Bin 180 -> 205 bytes
...{function_first.json => function_count_if.json} | 19 +-
...first.proto.bin => function_count_if.proto.bin} | Bin 180 -> 190 bytes
...=> function_first_value_with_ignore_nulls.json} | 2 +-
...nction_first_value_with_ignore_nulls.proto.bin} | Bin 180 -> 186 bytes
...> function_first_value_with_respect_nulls.json} | 6 +-
...ction_first_value_with_respect_nulls.proto.bin} | Bin 180 -> 180 bytes
....json => function_first_with_ignore_nulls.json} | 0
... => function_first_with_ignore_nulls.proto.bin} | Bin
...json => function_first_with_respect_nulls.json} | 2 +-
...=> function_first_with_respect_nulls.proto.bin} | Bin 180 -> 180 bytes
..._first.json => function_histogram_numeric.json} | 4 +-
...to.bin => function_histogram_numeric.proto.bin} | Bin 180 -> 192 bytes
... => function_last_value_with_ignore_nulls.json} | 2 +-
...unction_last_value_with_ignore_nulls.proto.bin} | Bin 180 -> 185 bytes
...=> function_last_value_with_respect_nulls.json} | 6 +-
...nction_last_value_with_respect_nulls.proto.bin} | Bin 179 -> 179 bytes
...t.json => function_last_with_ignore_nulls.json} | 2 +-
...n => function_last_with_ignore_nulls.proto.bin} | Bin 179 -> 179 bytes
....json => function_last_with_respect_nulls.json} | 0
... => function_last_with_respect_nulls.proto.bin} | Bin
.../query-tests/queries/function_reduce.json | 62 ++++
...n_first.proto.bin => function_reduce.proto.bin} | Bin 180 -> 232 bytes
.../source/reference/pyspark.sql/functions.rst | 7 +
python/pyspark/sql/connect/functions.py | 83 +++++
python/pyspark/sql/functions.py | 396 +++++++++++++++++++++
.../scala/org/apache/spark/sql/functions.scala | 161 ++++++++-
.../apache/spark/sql/DataFrameAggregateSuite.scala | 61 ++++
.../apache/spark/sql/DataFrameFunctionsSuite.scala | 30 +-
50 files changed, 1059 insertions(+), 45 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 00b92e7bb79..8c1ee094c7a 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -520,6 +520,34 @@ object functions {
*/
def first(columnName: String): Column = first(Column(columnName))
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * @note
+ * The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def first_value(e: Column): Column = Column.fn("first_value", e)
+
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @note
+ * The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def first_value(e: Column, ignoreNulls: Column): Column =
+ Column.fn("first_value", e, ignoreNulls)
+
/**
* Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated or
* not, returns 1 for aggregated or 0 for not aggregated in the result set.
@@ -732,6 +760,34 @@ object functions {
*/
def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * @note
+ * The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def last_value(e: Column): Column = Column.fn("last_value", e)
+
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @note
+ * The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def last_value(e: Column, ignoreNulls: Column): Column =
+ Column.fn("last_value", e, ignoreNulls)
+
/**
* Aggregate function: returns the most frequent value in a group.
*
@@ -849,6 +905,25 @@ object functions {
def percentile_approx(e: Column, percentage: Column, accuracy: Column): Column =
Column.fn("percentile_approx", e, percentage, accuracy)
+ /**
+ * Aggregate function: returns the approximate `percentile` of the numeric column `col` which is
+ * the smallest value in the ordered `col` values (sorted from least to greatest) such that no
+ * more than `percentage` of `col` values is less than the value or equal to that value.
+ *
+ * If percentage is an array, each value must be between 0.0 and 1.0. If it is a single floating
+ * point value, it must be between 0.0 and 1.0.
+ *
+ * The accuracy parameter is a positive numeric literal which controls approximation accuracy at
+ * the cost of memory. Higher value of accuracy yields better accuracy, 1.0/accuracy is the
+ * relative error of the approximation.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def approx_percentile(e: Column, percentage: Column, accuracy: Column): Column = {
+ Column.fn("approx_percentile", e, percentage, accuracy)
+ }
+
/**
* Aggregate function: returns the product of all numerical elements in a group.
*
@@ -1101,6 +1176,49 @@ object functions {
*/
def regr_syy(y: Column, x: Column): Column = Column.fn("regr_syy", y, x)
+ /**
+ * Aggregate function: returns some value of `e` for a group of rows.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def any_value(e: Column): Column = Column.fn("any_value", e)
+
+ /**
+ * Aggregate function: returns some value of `e` for a group of rows. If `isIgnoreNull` is true,
+ * returns only non-null values.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def any_value(e: Column, ignoreNulls: Column): Column =
+ Column.fn("any_value", e, ignoreNulls)
+
+ /**
+ * Aggregate function: returns the number of `TRUE` values for the expression.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def count_if(e: Column): Column = Column.fn("count_if", e)
+
+ /**
+ * Aggregate function: computes a histogram on numeric 'expr' using nb bins. The return value is
+ * an array of (x,y) pairs representing the centers of the histogram's bins. As the value of
+ * 'nb' is increased, the histogram approximation gets finer-grained, but may yield artifacts
+ * around outliers. In practice, 20-40 histogram bins appear to work well, with more bins being
+ * required for skewed or smaller datasets. Note that this function creates a histogram with
+ * non-uniform bin widths. It offers no guarantees in terms of the mean-squared-error of the
+ * histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+ * statistical computing packages. Note: the output type of the 'x' field in the return value is
+ * propagated from the input value consumed in the aggregate function.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def histogram_numeric(e: Column, nBins: Column): Column =
+ Column.fn("histogram_numeric", e, nBins)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -5069,6 +5187,55 @@ object functions {
def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
aggregate(expr, initialValue, merge, c => c)
+ /**
+ * Applies a binary operator to an initial state and all elements in the array, and reduces this
+ * to a single state. The final state is converted into the final result by applying a finish
+ * function.
+ * {{{
+ * df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10))
+ * }}}
+ *
+ * @param expr
+ * the input array column
+ * @param initialValue
+ * the initial value
+ * @param merge
+ * (combined_value, input_value) => combined_value, the merge function to merge an input value
+ * to the combined_value
+ * @param finish
+ * combined_value => final_value, the lambda function to convert the combined value of all
+ * inputs to final result
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def reduce(
+ expr: Column,
+ initialValue: Column,
+ merge: (Column, Column) => Column,
+ finish: Column => Column): Column =
+ Column.fn("reduce", expr, initialValue, createLambda(merge), createLambda(finish))
+
+ /**
+ * Applies a binary operator to an initial state and all elements in the array, and reduces this
+ * to a single state.
+ * {{{
+ * df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x))
+ * }}}
+ *
+ * @param expr
+ * the input array column
+ * @param initialValue
+ * the initial value
+ * @param merge
+ * (combined_value, input_value) => combined_value, the merge function to merge an input value
+ * to the combined_value
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def reduce(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
+ reduce(expr, initialValue, merge, c => c)
+
/**
* Merge two given arrays, element-wise, into a single array using a function. If one array is
* shorter, nulls are appended at the end to match the length of the longer array, before
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index b2b70acdf60..90e0d74e195 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -950,16 +950,56 @@ class PlanGenerationTestSuite
fn.covar_samp("a", "b")
}
- functionTest("first") {
+ functionTest("first with ignore nulls") {
fn.first("a", ignoreNulls = true)
}
+ functionTest("first with respect nulls") {
+ fn.first("a")
+ }
+
+ functionTest("first_value with ignore nulls") {
+ fn.first_value(fn.col("a"), ignoreNulls = lit(true))
+ }
+
+ functionTest("first_value with respect nulls") {
+ fn.first_value(fn.col("a"))
+ }
+
+ functionTest("any_value with ignore nulls") {
+ fn.any_value(fn.col("a"), ignoreNulls = lit(true))
+ }
+
+ functionTest("any_value with respect nulls") {
+ fn.any_value(fn.col("a"))
+ }
+
functionTest("kurtosis") {
fn.kurtosis("a")
}
- functionTest("last") {
- fn.last("a", ignoreNulls = false)
+ functionTest("last with ignore nulls") {
+ fn.last("a", ignoreNulls = true)
+ }
+
+ functionTest("last with respect nulls") {
+ fn.last("a")
+ }
+
+ functionTest("last_value with ignore nulls") {
+ fn.last_value(fn.col("a"), ignoreNulls = lit(true))
+ }
+
+ functionTest("last_value with respect nulls") {
+ fn.last_value(fn.col("a"))
+ }
+
+ functionTest("count_if") {
+ fn.count_if(fn.col("a").gt(0))
+ }
+
+ functionTest("histogram_numeric") {
+ fn.histogram_numeric(fn.col("a"), lit(10))
}
functionTest("mode") {
@@ -998,6 +1038,10 @@ class PlanGenerationTestSuite
fn.percentile_approx(fn.col("a"), fn.lit(0.3), fn.lit(20))
}
+ functionTest("approx_percentile") {
+ fn.approx_percentile(fn.col("a"), fn.lit(0.3), fn.lit(20))
+ }
+
functionTest("product") {
fn.product(fn.col("a"))
}
@@ -2127,6 +2171,10 @@ class PlanGenerationTestSuite
fn.aggregate(fn.col("e"), lit(0), (x, y) => x + y)
}
+ functionTest("reduce") {
+ fn.reduce(fn.col("e"), lit(0), (x, y) => x + y)
+ }
+
functionTest("zip_with") {
fn.zip_with(fn.col("e"), fn.col("e"), (x, y) => x + y)
}
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
similarity index 57%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
index 0675353b706..15b0279de12 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, true) AS any_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
similarity index 57%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
index 0675353b706..137d5a1b052 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_any_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, false) AS any_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain
new file mode 100644
index 00000000000..669e771a5dc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_approx_percentile.explain
@@ -0,0 +1,2 @@
+Aggregate [approx_percentile(a#0, 0.3, 20, 0, 0) AS approx_percentile(a, 0.3, 20)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain
new file mode 100644
index 00000000000..1c23bbf6bce
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_if.explain
@@ -0,0 +1,2 @@
+Aggregate [count(if (((a#0 > 0) = false)) null else (a#0 > 0)) AS count_if((a > 0))#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
similarity index 53%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
index 0675353b706..c4ca5da1650 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first_value(a#0, true) AS first_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
similarity index 53%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
index 0675353b706..9d605bd61bc 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first_value(a#0, false) AS first_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_ignore_nulls.explain
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_ignore_nulls.explain
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
similarity index 59%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
index 0675353b706..81d1a3b1204 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_first_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [first(a#0, false) AS first(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain
new file mode 100644
index 00000000000..273e486db72
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_histogram_numeric.explain
@@ -0,0 +1,2 @@
+Aggregate [histogram_numeric(a#0, 10, 0, 0) AS histogram_numeric(a, 10)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
similarity index 54%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
index 0675353b706..8c3cf54aedf 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [last_value(a#0, true) AS last_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
similarity index 54%
rename from connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
rename to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
index 0675353b706..705bd9954d5 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_first.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_value_with_respect_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [first(a#0, true) AS first(a)#0]
+Aggregate [last_value(a#0, false) AS last_value(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
similarity index 60%
copy from connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
copy to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
index a7ae558d5c9..207c073446b 100644
--- a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_ignore_nulls.explain
@@ -1,2 +1,2 @@
-Aggregate [last(a#0, false) AS last(a)#0]
+Aggregate [last(a#0, true) AS last(a)#0]
+- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_respect_nulls.explain
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/explain-results/function_last.explain
rename to connector/connect/common/src/test/resources/query-tests/explain-results/function_last_with_respect_nulls.explain
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain
new file mode 100644
index 00000000000..d25e5a306e2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reduce.explain
@@ -0,0 +1,2 @@
+Project [reduce(e#0, 0, lambdafunction((lambda x#0 + lambda y#0), lambda x#0, lambda y#0, false), lambdafunction(lambda x#0, lambda x#0, false)) AS reduce(e, 0, lambdafunction((namedlambdavariable() + namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
index dc33bad3c50..011d43a91d0 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "any_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin
similarity index 74%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin
index cb029dfd26b..546c696ecfd 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
similarity index 83%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
index dc33bad3c50..7d4f5a2de38 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "any_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
}
- }, {
- "literal": {
- "boolean": true
- }
}]
}
}]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin
similarity index 73%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin
index 69221737be6..124a7ad7efe 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_any_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
similarity index 80%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
index dc33bad3c50..490a2dcd869 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.json
@@ -13,14 +13,18 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "approx_percentile",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
}
}, {
"literal": {
- "boolean": true
+ "double": 0.3
+ }
+ }, {
+ "literal": {
+ "integer": 20
}
}]
}
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin
similarity index 64%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin
index cb029dfd26b..ae73716fa43 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_approx_percentile.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
similarity index 58%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
index dc33bad3c50..669477bbc5d 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.json
@@ -13,14 +13,19 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "count_if",
"arguments": [{
- "unresolvedAttribute": {
- "unparsedIdentifier": "a"
- }
- }, {
- "literal": {
- "boolean": true
+ "unresolvedFunction": {
+ "functionName": "\u003e",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "literal": {
+ "integer": 0
+ }
+ }]
}
}]
}
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin
similarity index 72%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin
index cb029dfd26b..07c65ebaa42 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_count_if.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
index dc33bad3c50..af55fe44ae8 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "first_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin
similarity index 73%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin
index cb029dfd26b..7121c820aa7 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
similarity index 83%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
index dc33bad3c50..8276e35893f 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "first_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
}
- }, {
- "literal": {
- "boolean": true
- }
}]
}
}]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin
index cb029dfd26b..b843d521115 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.json
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.json
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.proto.bin
similarity index 100%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_ignore_nulls.proto.bin
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
similarity index 95%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
index dc33bad3c50..0e315cc6b1b 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.json
@@ -20,7 +20,7 @@
}
}, {
"literal": {
- "boolean": true
+ "boolean": false
}
}]
}
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin
similarity index 98%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin
index cb029dfd26b..bf1d48903df 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_first_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
index dc33bad3c50..548b4977ddc 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.json
@@ -13,14 +13,14 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "histogram_numeric",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
}
}, {
"literal": {
- "boolean": true
+ "integer": 10
}
}]
}
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin
similarity index 71%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin
index cb029dfd26b..81dbcd476ec 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_histogram_numeric.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
similarity index 94%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
index dc33bad3c50..e78a456082c 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.json
@@ -13,7 +13,7 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "last_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin
similarity index 74%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin
index cb029dfd26b..c04f8385995 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
similarity index 83%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_first.json
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
index dc33bad3c50..cb509997e65 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_first.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.json
@@ -13,15 +13,11 @@
},
"expressions": [{
"unresolvedFunction": {
- "functionName": "first",
+ "functionName": "last_value",
"arguments": [{
"unresolvedAttribute": {
"unparsedIdentifier": "a"
}
- }, {
- "literal": {
- "boolean": true
- }
}]
}
}]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin
similarity index 89%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin
index 69221737be6..cee9838b704 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_value_with_respect_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
similarity index 95%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.json
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
index f26e5887ed5..6d1be02c785 100644
--- a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.json
@@ -20,7 +20,7 @@
}
}, {
"literal": {
- "boolean": false
+ "boolean": true
}
}]
}
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin
similarity index 98%
copy from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
copy to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin
index 69221737be6..f6590582c6f 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_ignore_nulls.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.json b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.json
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_last.json
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.json
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.proto.bin
similarity index 100%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_last.proto.bin
rename to connector/connect/common/src/test/resources/query-tests/queries/function_last_with_respect_nulls.proto.bin
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json
new file mode 100644
index 00000000000..b35437bd980
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.json
@@ -0,0 +1,62 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "reduce",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "e"
+ }
+ }, {
+ "literal": {
+ "integer": 0
+ }
+ }, {
+ "lambdaFunction": {
+ "function": {
+ "unresolvedFunction": {
+ "functionName": "+",
+ "arguments": [{
+ "unresolvedNamedLambdaVariable": {
+ "nameParts": ["x"]
+ }
+ }, {
+ "unresolvedNamedLambdaVariable": {
+ "nameParts": ["y"]
+ }
+ }]
+ }
+ },
+ "arguments": [{
+ "nameParts": ["x"]
+ }, {
+ "nameParts": ["y"]
+ }]
+ }
+ }, {
+ "lambdaFunction": {
+ "function": {
+ "unresolvedNamedLambdaVariable": {
+ "nameParts": ["x"]
+ }
+ },
+ "arguments": [{
+ "nameParts": ["x"]
+ }]
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin
similarity index 56%
rename from connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin
rename to connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin
index cb029dfd26b..0cc2cd204d8 100644
Binary files a/connector/connect/common/src/test/resources/query-tests/queries/function_first.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/function_reduce.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index 1ccf602b06d..697edfa45ad 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -253,8 +253,10 @@ Aggregate Functions
.. autosummary::
:toctree: api/
+ any_value
approxCountDistinct
approx_count_distinct
+ approx_percentile
avg
collect_list
collect_set
@@ -262,15 +264,19 @@ Aggregate Functions
count
count_distinct
countDistinct
+ count_if
covar_pop
covar_samp
first
+ first_value
grouping
grouping_id
+ histogram_numeric
hll_sketch_agg
hll_union_agg
kurtosis
last
+ last_value
max
max_by
mean
@@ -281,6 +287,7 @@ Aggregate Functions
percentile
percentile_approx
product
+ reduce
regr_avgx
regr_avgy
regr_count
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index e19247161f7..eeaf25bdc7d 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -1078,6 +1078,26 @@ def percentile_approx(
percentile_approx.__doc__ = pysparkfuncs.percentile_approx.__doc__
+def approx_percentile(
+ col: "ColumnOrName",
+ percentage: Union[Column, float, List[float], Tuple[float]],
+ accuracy: Union[Column, float] = 10000,
+) -> Column:
+ if isinstance(percentage, Column):
+ percentage_col = percentage
+ elif isinstance(percentage, (list, tuple)):
+ # Convert tuple to list
+ percentage_col = lit(list(percentage))
+ else:
+ # Probably scalar
+ percentage_col = lit(percentage)
+
+ return _invoke_function("approx_percentile", _to_col(col), percentage_col, lit(accuracy))
+
+
+approx_percentile.__doc__ = pysparkfuncs.approx_percentile.__doc__
+
+
def product(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("product", col)
@@ -1269,6 +1289,53 @@ def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = No
nth_value.__doc__ = pysparkfuncs.nth_value.__doc__
+def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("any_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("any_value", col, ignoreNulls)
+
+
+any_value.__doc__ = pysparkfuncs.any_value.__doc__
+
+
+def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("first_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("first_value", col, ignoreNulls)
+
+
+first_value.__doc__ = pysparkfuncs.first_value.__doc__
+
+
+def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("last_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("last_value", col, ignoreNulls)
+
+
+last_value.__doc__ = pysparkfuncs.last_value.__doc__
+
+
+def count_if(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("count_if", col)
+
+
+count_if.__doc__ = pysparkfuncs.count_if.__doc__
+
+
+def histogram_numeric(col: "ColumnOrName", nBins: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("histogram_numeric", col, nBins)
+
+
+histogram_numeric.__doc__ = pysparkfuncs.histogram_numeric.__doc__
+
+
def ntile(n: int) -> Column:
return _invoke_function("ntile", lit(n))
@@ -1313,6 +1380,22 @@ def aggregate(
aggregate.__doc__ = pysparkfuncs.aggregate.__doc__
+def reduce(
+ col: "ColumnOrName",
+ initialValue: "ColumnOrName",
+ merge: Callable[[Column, Column], Column],
+ finish: Optional[Callable[[Column], Column]] = None,
+) -> Column:
+ if finish is not None:
+ return _invoke_higher_order_function("reduce", [col, initialValue], [merge, finish])
+
+ else:
+ return _invoke_higher_order_function("reduce", [col, initialValue], [merge])
+
+
+reduce.__doc__ = pysparkfuncs.reduce.__doc__
+
+
def array(*cols: Union["ColumnOrName", List["ColumnOrName"], Tuple["ColumnOrName", ...]]) -> Column:
if len(cols) == 1 and isinstance(cols[0], (list, set, tuple)):
cols = cols[0] # type: ignore[assignment]
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index c4d26c8aaf5..b5c3ec9cec6 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3852,6 +3852,83 @@ def percentile_approx(
return _invoke_function("percentile_approx", _to_java_column(col), percentage, accuracy)
+@try_remote_functions
+def approx_percentile(
+ col: "ColumnOrName",
+ percentage: Union[Column, float, List[float], Tuple[float]],
+ accuracy: Union[Column, float] = 10000,
+) -> Column:
+ """Returns the approximate `percentile` of the numeric column `col` which is the smallest value
+ in the ordered `col` values (sorted from least to greatest) such that no more than `percentage`
+ of `col` values is less than the value or equal to that value.
+
+
+ .. versionadded:: 3.1.0
+
+ .. versionchanged:: 3.4.0
+ Supports Spark Connect.
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ input column.
+ percentage : :class:`~pyspark.sql.Column`, float, list of floats or tuple of floats
+ percentage in decimal (must be between 0.0 and 1.0).
+ When percentage is an array, each value of the percentage array must be between 0.0 and 1.0.
+ In this case, returns the approximate percentile array of column col
+ at the given percentage array.
+ accuracy : :class:`~pyspark.sql.Column` or float
+ is a positive numeric literal which controls approximation accuracy
+ at the cost of memory. Higher value of accuracy yields better accuracy,
+ 1.0/accuracy is the relative error of the approximation. (default: 10000).
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ approximate `percentile` of the numeric column.
+
+ Examples
+ --------
+ >>> key = (col("id") % 3).alias("key")
+ >>> value = (randn(42) + key * 10).alias("value")
+ >>> df = spark.range(0, 1000, 1, 1).select(key, value)
+ >>> df.select(
+ ... approx_percentile("value", [0.25, 0.5, 0.75], 1000000).alias("quantiles")
+ ... ).printSchema()
+ root
+ |-- quantiles: array (nullable = true)
+ | |-- element: double (containsNull = false)
+
+ >>> df.groupBy("key").agg(
+ ... approx_percentile("value", 0.5, lit(1000000)).alias("median")
+ ... ).printSchema()
+ root
+ |-- key: long (nullable = true)
+ |-- median: double (nullable = true)
+ """
+ sc = get_active_spark_context()
+
+ if isinstance(percentage, (list, tuple)):
+ # A local list
+ percentage = _invoke_function(
+ "array", _to_seq(sc, [_create_column_from_literal(x) for x in percentage])
+ )._jc
+ elif isinstance(percentage, Column):
+ # Already a Column
+ percentage = _to_java_column(percentage)
+ else:
+ # Probably scalar
+ percentage = _create_column_from_literal(percentage)
+
+ accuracy = (
+ _to_java_column(accuracy)
+ if isinstance(accuracy, Column)
+ else _create_column_from_literal(accuracy)
+ )
+
+ return _invoke_function("approx_percentile", _to_java_column(col), percentage, accuracy)
+
+
@try_remote_functions
def rand(seed: Optional[int] = None) -> Column:
"""Generates a random column with independent and identically distributed (i.i.d.) samples
@@ -4785,6 +4862,253 @@ def nth_value(col: "ColumnOrName", offset: int, ignoreNulls: Optional[bool] = Fa
return _invoke_function("nth_value", _to_java_column(col), offset, ignoreNulls)
+@try_remote_functions
+def any_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ """Returns some value of `col` for a group of rows.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+ ignorenulls : :class:`~pyspark.sql.Column` or bool
+ if first value is null then look for first non-null value.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ some value of `col` for a group of rows.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(None, 1),
+ ... ("a", 2),
+ ... ("a", 3),
+ ... ("b", 8),
+ ... ("b", 2)], ["c1", "c2"])
+ >>> df.show()
+ +----+---+
+ | c1| c2|
+ +----+---+
+ |NULL| 1|
+ | a| 2|
+ | a| 3|
+ | b| 8|
+ | b| 2|
+ +----+---+
+ <BLANKLINE>
+ >>> df.select(any_value('c1'), any_value('c2')).collect()
+ [Row(any_value(c1)=None, any_value(c2)=1)]
+ >>> df.select(any_value('c1', True), any_value('c2', True)).collect()
+ [Row(any_value(c1)='a', any_value(c2)=1)]
+ """
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("any_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("any_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def first_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ """Returns the first value of `col` for a group of rows. It will return the first non-null
+ value it sees when `ignoreNulls` is set to true. If all values are null, then null is returned.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+ ignorenulls : :class:`~pyspark.sql.Column` or bool
+ if first value is null then look for first non-null value.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ some value of `col` for a group of rows.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(None, 1),
+ ... ("a", 2),
+ ... ("a", 3),
+ ... ("b", 8),
+ ... ("b", 2)], ["c1", "c2"])
+ >>> df.show()
+ +----+---+
+ | c1| c2|
+ +----+---+
+ |NULL| 1|
+ | a| 2|
+ | a| 3|
+ | b| 8|
+ | b| 2|
+ +----+---+
+ <BLANKLINE>
+ >>> df.select(first_value('c1').alias('a'), first_value('c2').alias('b')).collect()
+ [Row(a=None, b=1)]
+ >>> df.select(first_value('c1', True).alias('a'), first_value('c2', True).alias('b')).collect()
+ [Row(a='a', b=1)]
+ """
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("first_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("first_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def last_value(col: "ColumnOrName", ignoreNulls: Optional[Union[bool, Column]] = None) -> Column:
+ """Returns the last value of `col` for a group of rows. It will return the last non-null
+ value it sees when `ignoreNulls` is set to true. If all values are null, then null is returned.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+ ignorenulls : :class:`~pyspark.sql.Column` or bool
+ if first value is null then look for first non-null value.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ some value of `col` for a group of rows.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([("a", 1),
+ ... ("a", 2),
+ ... ("a", 3),
+ ... ("b", 8),
+ ... (None, 2)], ["c1", "c2"])
+ >>> df.show()
+ +----+---+
+ | c1| c2|
+ +----+---+
+ | a| 1|
+ | a| 2|
+ | a| 3|
+ | b| 8|
+ |NULL| 2|
+ +----+---+
+ <BLANKLINE>
+ >>> df.select(last_value('c1').alias('a'), last_value('c2').alias('b')).collect()
+ [Row(a=None, b=2)]
+ >>> df.select(last_value('c1', True).alias('a'), last_value('c2', True).alias('b')).collect()
+ [Row(a='b', b=2)]
+ """
+ if ignoreNulls is None:
+ return _invoke_function_over_columns("last_value", col)
+ else:
+ ignoreNulls = lit(ignoreNulls) if isinstance(ignoreNulls, bool) else ignoreNulls
+ return _invoke_function_over_columns("last_value", col, ignoreNulls)
+
+
+@try_remote_functions
+def count_if(col: "ColumnOrName") -> Column:
+ """Returns the number of `TRUE` values for the `col`.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ the number of `TRUE` values for the `col`.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([("a", 1),
+ ... ("a", 2),
+ ... ("a", 3),
+ ... ("b", 8),
+ ... ("b", 2)], ["c1", "c2"])
+ >>> df.show()
+ +---+---+
+ | c1| c2|
+ +---+---+
+ | a| 1|
+ | a| 2|
+ | a| 3|
+ | b| 8|
+ | b| 2|
+ +---+---+
+ <BLANKLINE>
+ >>> df.select(count_if(col('c2') % 2 == 0)).show()
+ +------------------------+
+ |count_if(((c2 % 2) = 0))|
+ +------------------------+
+ | 3|
+ +------------------------+
+ <BLANKLINE>
+ """
+ return _invoke_function_over_columns("count_if", col)
+
+
+@try_remote_functions
+def histogram_numeric(col: "ColumnOrName", nBins: "ColumnOrName") -> Column:
+ """Computes a histogram on numeric 'col' using nb bins.
+ The return value is an array of (x,y) pairs representing the centers of the
+ histogram's bins. As the value of 'nb' is increased, the histogram approximation
+ gets finer-grained, but may yield artifacts around outliers. In practice, 20-40
+ histogram bins appear to work well, with more bins being required for skewed or
+ smaller datasets. Note that this function creates a histogram with non-uniform
+ bin widths. It offers no guarantees in terms of the mean-squared-error of the
+ histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+ statistical computing packages. Note: the output type of the 'x' field in the return value is
+ propagated from the input value consumed in the aggregate function.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to work on.
+ nBins : :class:`~pyspark.sql.Column` or str
+ number of Histogram columns.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ a histogram on numeric 'col' using nb bins.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([("a", 1),
+ ... ("a", 2),
+ ... ("a", 3),
+ ... ("b", 8),
+ ... ("b", 2)], ["c1", "c2"])
+ >>> df.show()
+ +---+---+
+ | c1| c2|
+ +---+---+
+ | a| 1|
+ | a| 2|
+ | a| 3|
+ | b| 8|
+ | b| 2|
+ +---+---+
+ <BLANKLINE>
+ >>> df.select(histogram_numeric('c2', lit(5))).show()
+ +------------------------+
+ |histogram_numeric(c2, 5)|
+ +------------------------+
+ | [{1, 1.0}, {2, 1....|
+ +------------------------+
+ <BLANKLINE>
+ """
+ return _invoke_function_over_columns("histogram_numeric", col, nBins)
+
+
@try_remote_functions
def ntile(n: int) -> Column:
"""
@@ -11024,6 +11348,78 @@ def aggregate(
return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge])
+@try_remote_functions
+def reduce(
+ col: "ColumnOrName",
+ initialValue: "ColumnOrName",
+ merge: Callable[[Column, Column], Column],
+ finish: Optional[Callable[[Column], Column]] = None,
+) -> Column:
+ """
+ Applies a binary operator to an initial state and all elements in the array,
+ and reduces this to a single state. The final state is converted into the final result
+ by applying a finish function.
+
+ Both functions can use methods of :class:`~pyspark.sql.Column`, functions defined in
+ :py:mod:`pyspark.sql.functions` and Scala ``UserDefinedFunctions``.
+ Python ``UserDefinedFunctions`` are not supported
+ (`SPARK-27052 <https://issues.apache.org/jira/browse/SPARK-27052>`__).
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ name of column or expression
+ initialValue : :class:`~pyspark.sql.Column` or str
+ initial value. Name of column or expression
+ merge : function
+ a binary function ``(acc: Column, x: Column) -> Column...`` returning expression
+ of the same type as ``zero``
+ finish : function
+ an optional unary function ``(x: Column) -> Column: ...``
+ used to convert accumulated value.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ final value after aggregate function is applied.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(1, [20.0, 4.0, 2.0, 6.0, 10.0])], ("id", "values"))
+ >>> df.select(reduce("values", lit(0.0), lambda acc, x: acc + x).alias("sum")).show()
+ +----+
+ | sum|
+ +----+
+ |42.0|
+ +----+
+
+ >>> def merge(acc, x):
+ ... count = acc.count + 1
+ ... sum = acc.sum + x
+ ... return struct(count.alias("count"), sum.alias("sum"))
+ >>> df.select(
+ ... reduce(
+ ... "values",
+ ... struct(lit(0).alias("count"), lit(0.0).alias("sum")),
+ ... merge,
+ ... lambda acc: acc.sum / acc.count,
+ ... ).alias("mean")
+ ... ).show()
+ +----+
+ |mean|
+ +----+
+ | 8.4|
+ +----+
+ """
+ if finish is not None:
+ return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge, finish])
+
+ else:
+ return _invoke_higher_order_function("ArrayAggregate", [col, initialValue], [merge])
+
+
@try_remote_functions
def zip_with(
left: "ColumnOrName",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 2f9b458a9d8..d797866664e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -549,6 +549,33 @@ object functions {
*/
def first(columnName: String): Column = first(Column(columnName))
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * @note The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def first_value(e: Column): Column = first(e)
+
+ /**
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @note The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def first_value(e: Column, ignoreNulls: Column): Column = withAggregateFunction {
+ new First(e.expr, ignoreNulls.expr)
+ }
+
/**
* Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
* or not, returns 1 for aggregated or 0 for not aggregated in the result set.
@@ -724,7 +751,7 @@ object functions {
* @since 2.0.0
*/
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
- new Last(e.expr, ignoreNulls)
+ Last(e.expr, ignoreNulls)
}
/**
@@ -771,6 +798,33 @@ object functions {
*/
def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * @note The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def last_value(e: Column): Column = last(e)
+
+ /**
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @note The function is non-deterministic because its results depends on the order of the rows
+ * which may be non-deterministic after a shuffle.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def last_value(e: Column, ignoreNulls: Column): Column = withAggregateFunction {
+ new Last(e.expr, ignoreNulls.expr)
+ }
+
/**
* Aggregate function: returns the most frequent value in a group.
*
@@ -906,6 +960,26 @@ object functions {
}
}
+ /**
+ * Aggregate function: returns the approximate `percentile` of the numeric column `col` which
+ * is the smallest value in the ordered `col` values (sorted from least to greatest) such that
+ * no more than `percentage` of `col` values is less than the value or equal to that value.
+ *
+ * If percentage is an array, each value must be between 0.0 and 1.0.
+ * If it is a single floating point value, it must be between 0.0 and 1.0.
+ *
+ * The accuracy parameter is a positive numeric literal
+ * which controls approximation accuracy at the cost of memory.
+ * Higher value of accuracy yields better accuracy, 1.0/accuracy
+ * is the relative error of the approximation.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def approx_percentile(e: Column, percentage: Column, accuracy: Column): Column = {
+ percentile_approx(e, percentage, accuracy)
+ }
+
/**
* Aggregate function: returns the product of all numerical elements in a group.
*
@@ -1165,6 +1239,50 @@ object functions {
*/
def regr_syy(y: Column, x: Column): Column = withAggregateFunction { RegrSYY(y.expr, x.expr) }
+ /**
+ * Aggregate function: returns some value of `e` for a group of rows.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def any_value(e: Column): Column = withAggregateFunction { new AnyValue(e.expr) }
+
+ /**
+ * Aggregate function: returns some value of `e` for a group of rows.
+ * If `isIgnoreNull` is true, returns only non-null values.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def any_value(e: Column, ignoreNulls: Column): Column =
+ withAggregateFunction { new AnyValue(e.expr, ignoreNulls.expr) }
+
+ /**
+ * Aggregate function: returns the number of `TRUE` values for the expression.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def count_if(e: Column): Column = withAggregateFunction { CountIf(e.expr) }
+
+ /**
+ * Aggregate function: computes a histogram on numeric 'expr' using nb bins.
+ * The return value is an array of (x,y) pairs representing the centers of the
+ * histogram's bins. As the value of 'nb' is increased, the histogram approximation
+ * gets finer-grained, but may yield artifacts around outliers. In practice, 20-40
+ * histogram bins appear to work well, with more bins being required for skewed or
+ * smaller datasets. Note that this function creates a histogram with non-uniform
+ * bin widths. It offers no guarantees in terms of the mean-squared-error of the
+ * histogram, but in practice is comparable to the histograms produced by the R/S-Plus
+ * statistical computing packages. Note: the output type of the 'x' field in the return value is
+ * propagated from the input value consumed in the aggregate function.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def histogram_numeric(e: Column, nBins: Column): Column =
+ withAggregateFunction { new HistogramNumeric(e.expr, nBins.expr) }
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -5030,6 +5148,47 @@ object functions {
def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
aggregate(expr, initialValue, merge, c => c)
+ /**
+ * Applies a binary operator to an initial state and all elements in the array,
+ * and reduces this to a single state. The final state is converted into the final result
+ * by applying a finish function.
+ * {{{
+ * df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10))
+ * }}}
+ *
+ * @param expr the input array column
+ * @param initialValue the initial value
+ * @param merge (combined_value, input_value) => combined_value, the merge function to merge
+ * an input value to the combined_value
+ * @param finish combined_value => final_value, the lambda function to convert the combined value
+ * of all inputs to final result
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def reduce(
+ expr: Column,
+ initialValue: Column,
+ merge: (Column, Column) => Column,
+ finish: Column => Column): Column = aggregate(expr, initialValue, merge, finish)
+
+ /**
+ * Applies a binary operator to an initial state and all elements in the array,
+ * and reduces this to a single state.
+ * {{{
+ * df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x))
+ * }}}
+ *
+ * @param expr the input array column
+ * @param initialValue the initial value
+ * @param merge (combined_value, input_value) => combined_value, the merge function to merge
+ * an input value to the combined_value
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def reduce(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
+ aggregate(expr, initialValue, merge, c => c)
+
/**
* Merge two given arrays, element-wise, into a single array using a function.
* If one array is shorter, nulls are appended at the end to match the length of the longer
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 586c3217dab..42ad1176abf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1042,6 +1042,29 @@ class DataFrameAggregateSuite extends QueryTest
)
}
+ test("any_value") {
+ checkAnswer(
+ courseSales.groupBy("course").agg(
+ any_value(col("year")),
+ any_value(col("year"), lit(true))
+ ),
+ Row("Java", 2012, 2012) :: Row("dotNET", 2012, 2012) :: Nil
+ )
+ }
+
+ test("approx_percentile") {
+ checkAnswer(
+ courseSales.groupBy("course").agg(
+ approx_percentile(col("earnings"), lit(0.3), lit(10000)),
+ approx_percentile(col("earnings"), array(lit(0.3), lit(0.6)), lit(10000)),
+ approx_percentile(col("earnings"), lit(0.3), lit(1)),
+ approx_percentile(col("earnings"), array(lit(0.3), lit(0.6)), lit(1))
+ ),
+ Row("Java", 20000.0, Seq(20000.0, 30000.0), 20000.0, Seq(20000.0, 20000.0)) ::
+ Row("dotNET", 5000.0, Seq(5000.0, 10000.0), 5000.0, Seq(5000.0, 5000.0)) :: Nil
+ )
+ }
+
test("count_if") {
withTempView("tempView") {
Seq(("a", None), ("a", Some(1)), ("a", Some(2)), ("a", Some(3)),
@@ -1093,6 +1116,44 @@ class DataFrameAggregateSuite extends QueryTest
context = ExpectedContext(fragment = "COUNT_IF(x)", start = 7, stop = 17))
}
}
+
+ checkAnswer(
+ courseSales.groupBy("course").agg(
+ count_if((col("earnings").gt(10000))),
+ count_if(col("course").equalTo("Java").and(col("earnings").gt(10000)))
+ ),
+ Row("Java", 2, 2) :: Row("dotNET", 1, 0) :: Nil
+ )
+ }
+
+ test("first_value") {
+ checkAnswer(
+ nullStrings.orderBy(col("n").desc).agg(
+ first_value(col("s")),
+ first_value(col("s"), lit(true))
+ ),
+ Row(null, "ABC") :: Nil
+ )
+ }
+
+ test("last_value") {
+ checkAnswer(
+ nullStrings.agg(
+ last_value(col("s")),
+ last_value(col("s"), lit(true))
+ ),
+ Row(null, "ABC") :: Nil
+ )
+ }
+
+ test("histogram_numeric") {
+ checkAnswer(
+ courseSales.groupBy("course").agg(
+ histogram_numeric(col("earnings"), lit(5))
+ ),
+ Row("Java", Seq(Row(20000.0, 1.0), Row(30000.0, 1.0))) ::
+ Row("dotNET", Seq(Row(5000.0, 1.0), Row(10000.0, 1.0), Row(48000.0, 1.0))) :: Nil
+ )
}
Seq(true, false).foreach { value =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index d1946469529..01e447895ca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -74,8 +74,8 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
)
val excludedSqlFunctions = Set(
- "random", "first_value", "last_value",
- "approx_percentile", "array_agg", "char_length", "character_length",
+ "random",
+ "array_agg", "char_length", "character_length",
"lcase", "position", "printf", "substr", "ucase", "day", "cardinality", "sha",
"getbit",
// aliases for existing functions
@@ -4193,12 +4193,24 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
Row(31),
Row(0),
Row(null)))
+ checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x)),
+ Seq(
+ Row(25),
+ Row(31),
+ Row(0),
+ Row(null)))
checkAnswer(df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10)),
Seq(
Row(250),
Row(310),
Row(0),
Row(null)))
+ checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x, _ * 10)),
+ Seq(
+ Row(250),
+ Row(310),
+ Row(0),
+ Row(null)))
}
// Test with local relation, the Project will be evaluated without codegen
@@ -4239,6 +4251,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
Row(null),
Row(0),
Row(null)))
+ checkAnswer(df.select(reduce(col("i"), lit(0), (acc, x) => acc + x)),
+ Seq(
+ Row(25),
+ Row(null),
+ Row(0),
+ Row(null)))
checkAnswer(
df.select(
aggregate(col("i"), lit(0), (acc, x) => acc + x, acc => coalesce(acc, lit(0)) * 10)),
@@ -4247,6 +4265,14 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
Row(0),
Row(0),
Row(null)))
+ checkAnswer(
+ df.select(
+ reduce(col("i"), lit(0), (acc, x) => acc + x, acc => coalesce(acc, lit(0)) * 10)),
+ Seq(
+ Row(250),
+ Row(0),
+ Row(0),
+ Row(null)))
}
// Test with local relation, the Project will be evaluated without codegen
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org