You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/30 05:36:15 UTC
[spark] branch master updated: [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new de8ec74f282 [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
de8ec74f282 is described below
commit de8ec74f2826db3815275b4fccef186f22c85833
Author: Tengfei Huang <te...@gmail.com>
AuthorDate: Thu Jun 29 22:35:31 2023 -0700
[SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
### What changes were proposed in this pull request?
Add following functions:
- array_agg
- array_size
- cardinality
- count_min_sketch
- named_struct
- json_array_length
- json_object_keys
- mask
To:
- Scala API
- Python API
- Spark Connect Scala Client
- Spark Connect Python Client
### Why are the changes needed?
Add Scala, Python and Connect API for these sql functions: array_agg, array_size, cardinality, count_min_sketch, named_struct, json_array_length, json_object_keys, mask
### Does this PR introduce _any_ user-facing change?
Yes, added new functions.
### How was this patch tested?
New UT added.
Closes #41718 from ivoson/SPARK-43926.
Lead-authored-by: Tengfei Huang <te...@gmail.com>
Co-authored-by: Tengfei Huang <te...@microsoft.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../scala/org/apache/spark/sql/functions.scala | 159 +++++++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 48 ++++
.../explain-results/function_array_agg.explain | 2 +
.../explain-results/function_array_size.explain | 2 +
.../explain-results/function_cardinality.explain | 2 +
.../function_count_min_sketch.explain | 2 +
.../function_json_array_length.explain | 2 +
.../function_json_object_keys.explain | 2 +
.../explain-results/function_mask.explain | 2 +
.../function_mask_with_specific_upperChar.explain | 2 +
..._mask_with_specific_upperChar_lowerChar.explain | 2 +
..._specific_upperChar_lowerChar_digitChar.explain | 2 +
...upperChar_lowerChar_digitChar_otherChar.explain | 2 +
.../explain-results/function_named_struct.explain | 2 +
.../query-tests/queries/function_array_agg.json | 25 ++
.../queries/function_array_agg.proto.bin | Bin 0 -> 178 bytes
.../query-tests/queries/function_array_size.json | 25 ++
.../queries/function_array_size.proto.bin | Bin 0 -> 179 bytes
.../query-tests/queries/function_cardinality.json | 25 ++
.../queries/function_cardinality.proto.bin | Bin 0 -> 180 bytes
.../queries/function_count_min_sketch.json | 37 +++
.../queries/function_count_min_sketch.proto.bin | Bin 0 -> 217 bytes
.../queries/function_json_array_length.json | 25 ++
.../queries/function_json_array_length.proto.bin | Bin 0 -> 186 bytes
.../queries/function_json_object_keys.json | 25 ++
.../queries/function_json_object_keys.proto.bin | Bin 0 -> 185 bytes
.../query-tests/queries/function_mask.json | 25 ++
.../query-tests/queries/function_mask.proto.bin | Bin 0 -> 173 bytes
.../function_mask_with_specific_upperChar.json | 29 +++
...function_mask_with_specific_upperChar.proto.bin | Bin 0 -> 180 bytes
...ion_mask_with_specific_upperChar_lowerChar.json | 33 +++
...ask_with_specific_upperChar_lowerChar.proto.bin | Bin 0 -> 187 bytes
...ith_specific_upperChar_lowerChar_digitChar.json | 37 +++
...pecific_upperChar_lowerChar_digitChar.proto.bin | Bin 0 -> 194 bytes
...ic_upperChar_lowerChar_digitChar_otherChar.json | 41 ++++
...perChar_lowerChar_digitChar_otherChar.proto.bin | Bin 0 -> 201 bytes
.../query-tests/queries/function_named_struct.json | 37 +++
.../queries/function_named_struct.proto.bin | Bin 0 -> 203 bytes
.../source/reference/pyspark.sql/functions.rst | 8 +
python/pyspark/sql/connect/functions.py | 74 ++++++
python/pyspark/sql/functions.py | 254 +++++++++++++++++++++
.../scala/org/apache/spark/sql/functions.scala | 180 +++++++++++++++
.../spark/sql/CountMinSketchAggQuerySuite.scala | 19 ++
.../apache/spark/sql/DataFrameAggregateSuite.scala | 12 +
.../apache/spark/sql/DataFrameFunctionsSuite.scala | 85 ++++++-
.../org/apache/spark/sql/JsonFunctionsSuite.scala | 21 ++
46 files changed, 1247 insertions(+), 1 deletion(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index ed0c13b2145..2c1a966dc71 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -137,6 +137,15 @@ object functions {
case s: Symbol => new Column(s.name)
case _ => createLiteral(create(literal))
}
+
+ /**
+ * Creates a struct with the given field names and values.
+ *
+ * @group normal_funcs
+ * @since 3.5.0
+ */
+ def named_struct(cols: Column*): Column = Column.fn("named_struct", cols: _*)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Sort functions
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -1275,6 +1284,29 @@ object functions {
*/
def bit_xor(e: Column): Column = Column.fn("bit_xor", e)
+ /**
+ * Aggregate function: returns a list of objects with duplicates.
+ *
+ * @note
+ * The function is non-deterministic because the order of collected results depends on the
+ * order of the rows which may be non-deterministic after a shuffle.
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def array_agg(e: Column): Column = Column.fn("array_agg", e)
+
+ /**
+ * Returns a count-min sketch of a column with the given esp, confidence and seed. The result is
+ * an array of bytes, which can be deserialized to a `CountMinSketch` before usage. Count-min
+ * sketch is a probabilistic data structure used for cardinality estimation using sub-linear
+ * space.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def count_min_sketch(e: Column, eps: Column, confidence: Column, seed: Column): Column =
+ Column.fn("count_min_sketch", e, eps, confidence, seed)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Window functions
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -4454,6 +4486,94 @@ object functions {
*/
def right(str: Column, len: Column): Column = Column.fn("right", str, len)
+ /**
+ * Masks the given string value. The function replaces characters with 'X' or 'x', and numbers
+ * with 'n'. This can be useful for creating copies of tables with sensitive information
+ * removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column): Column = Column.fn("mask", input)
+
+ /**
+ * Masks the given string value. The function replaces upper-case characters with specific
+ * character, lower-case characters with 'x', and numbers with 'n'. This can be useful for
+ * creating copies of tables with sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column): Column =
+ Column.fn("mask", input, upperChar)
+
+ /**
+ * Masks the given string value. The function replaces upper-case, lower-case characters with
+ * specific characters, and numbers with 'n'. This can be useful for creating copies of tables
+ * with sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column, lowerChar: Column): Column =
+ Column.fn("mask", input, upperChar, lowerChar)
+
+ /**
+ * Masks the given string value. The function replaces upper-case, lower-case characters and
+ * numbers with specific characters. This can be useful for creating copies of tables with
+ * sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ * @param digitChar
+ * character to replace digit characters with. Specify NULL to retain original character.
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column, lowerChar: Column, digitChar: Column): Column =
+ Column.fn("mask", input, upperChar, lowerChar, digitChar)
+
+ /**
+ * Masks the given string value. This can be useful for creating copies of tables with sensitive
+ * information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ * @param digitChar
+ * character to replace digit characters with. Specify NULL to retain original character.
+ * @param otherChar
+ * character to replace all other characters with. Specify NULL to retain original character.
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(
+ input: Column,
+ upperChar: Column,
+ lowerChar: Column,
+ digitChar: Column,
+ otherChar: Column): Column =
+ Column.fn("mask", input, upperChar, lowerChar, digitChar, otherChar)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// DateTime functions
//////////////////////////////////////////////////////////////////////////////////////////////
@@ -6927,6 +7047,45 @@ object functions {
*/
def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
+ /**
+ * Returns the total number of elements in the array. The function returns null for null input.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def array_size(e: Column): Column = Column.fn("array_size", e)
+
+ /**
+ * Returns length of array or map. This is an alias of `size` function.
+ *
+ * The function returns null for null input if spark.sql.legacy.sizeOfNull is set to false or
+ * spark.sql.ansi.enabled is set to true. Otherwise, the function returns -1 for null input.
+ * With the default settings, the function returns -1 for null input.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def cardinality(e: Column): Column = Column.fn("cardinality", e)
+
+ /**
+ * Returns the number of elements in the outermost JSON array. `NULL` is returned in case of any
+ * other valid JSON string, `NULL` or an invalid JSON.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def json_array_length(e: Column): Column = Column.fn("json_array_length", e)
+
+ /**
+ * Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+ * given, all the keys of the outermost object will be returned as an array. If it is any other
+ * valid JSON string, an invalid JSON string or an empty string, the function returns null.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def json_object_keys(e: Column): Column = Column.fn("json_object_keys", e)
+
//////////////////////////////////////////////////////////////////////////////////////////////
// Partition Transforms functions
//////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index ecb7092b8d9..875f10872a2 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2698,6 +2698,54 @@ class PlanGenerationTestSuite
fn.right(fn.col("g"), fn.col("g"))
}
+ functionTest("array_agg") {
+ fn.array_agg(fn.col("a"))
+ }
+
+ functionTest("array_size") {
+ fn.array_size(fn.col("e"))
+ }
+
+ functionTest("cardinality") {
+ fn.cardinality(fn.col("f"))
+ }
+
+ functionTest("count_min_sketch") {
+ fn.count_min_sketch(fn.col("a"), fn.lit(0.1), fn.lit(0.95), fn.lit(11))
+ }
+
+ functionTest("named_struct") {
+ fn.named_struct(fn.lit("x"), fn.col("a"), fn.lit("y"), fn.col("id"))
+ }
+
+ functionTest("json_array_length") {
+ fn.json_array_length(fn.col("g"))
+ }
+
+ functionTest("json_object_keys") {
+ fn.json_object_keys(fn.col("g"))
+ }
+
+ functionTest("mask with specific upperChar lowerChar digitChar otherChar") {
+ fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'), fn.lit('n'), fn.lit('*'))
+ }
+
+ functionTest("mask with specific upperChar lowerChar digitChar") {
+ fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'), fn.lit('n'))
+ }
+
+ functionTest("mask with specific upperChar lowerChar") {
+ fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'))
+ }
+
+ functionTest("mask with specific upperChar") {
+ fn.mask(fn.col("g"), fn.lit('X'))
+ }
+
+ functionTest("mask") {
+ fn.mask(fn.col("g"))
+ }
+
functionTest("aes_encrypt with mode padding iv aad") {
fn.aes_encrypt(
fn.col("g"),
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain
new file mode 100644
index 00000000000..102f736c62e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain
@@ -0,0 +1,2 @@
+Aggregate [collect_list(a#0, 0, 0) AS collect_list(a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain
new file mode 100644
index 00000000000..faf923d68d6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain
@@ -0,0 +1,2 @@
+Project [size(e#0, false) AS array_size(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain
new file mode 100644
index 00000000000..3392c67cda6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain
@@ -0,0 +1,2 @@
+Project [cardinality(f#0, true) AS cardinality(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain
new file mode 100644
index 00000000000..4baee060c91
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain
@@ -0,0 +1,2 @@
+Aggregate [count_min_sketch(a#0, 0.1, 0.95, 11, 0, 0) AS count_min_sketch(a, 0.1, 0.95, 11)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain
new file mode 100644
index 00000000000..50ab91560e6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain
@@ -0,0 +1,2 @@
+Project [json_array_length(g#0) AS json_array_length(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain
new file mode 100644
index 00000000000..30153bb192e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain
@@ -0,0 +1,2 @@
+Project [json_object_keys(g#0) AS json_object_keys(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain
new file mode 100644
index 00000000000..9f3f378f0ab
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, *) AS mask(g, X, x, n, *)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain
new file mode 100644
index 00000000000..bafbd5a650f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain
@@ -0,0 +1,2 @@
+Project [named_struct(x, a#0, y, id#0L) AS named_struct(x, a, y, id)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json
new file mode 100644
index 00000000000..a3197ce9506
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "array_agg",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin
new file mode 100644
index 00000000000..c7306df8621
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json
new file mode 100644
index 00000000000..c1c618bc7f1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "array_size",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "e"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin
new file mode 100644
index 00000000000..47949dfbbda
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json
new file mode 100644
index 00000000000..e2b3dd04287
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "cardinality",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "f"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin
new file mode 100644
index 00000000000..54c8cfe8434
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json
new file mode 100644
index 00000000000..94be79dcc33
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json
@@ -0,0 +1,37 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "count_min_sketch",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "literal": {
+ "double": 0.1
+ }
+ }, {
+ "literal": {
+ "double": 0.95
+ }
+ }, {
+ "literal": {
+ "integer": 11
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin
new file mode 100644
index 00000000000..11bcae8062e
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json
new file mode 100644
index 00000000000..36223a451e3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "json_array_length",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin
new file mode 100644
index 00000000000..817c803d830
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json
new file mode 100644
index 00000000000..f8667a1012a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "json_object_keys",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin
new file mode 100644
index 00000000000..4be9477ec91
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json
new file mode 100644
index 00000000000..c0473466a3e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json
@@ -0,0 +1,25 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "mask",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin
new file mode 100644
index 00000000000..5e94c267593
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json
new file mode 100644
index 00000000000..571d514e72d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json
@@ -0,0 +1,29 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "mask",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "string": "X"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin
new file mode 100644
index 00000000000..0f6c4b579c4
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json
new file mode 100644
index 00000000000..ae527d70cf1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json
@@ -0,0 +1,33 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "mask",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "string": "X"
+ }
+ }, {
+ "literal": {
+ "string": "x"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin
new file mode 100644
index 00000000000..5a6b4d7caa6
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json
new file mode 100644
index 00000000000..e7fee11d316
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json
@@ -0,0 +1,37 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "mask",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "string": "X"
+ }
+ }, {
+ "literal": {
+ "string": "x"
+ }
+ }, {
+ "literal": {
+ "string": "n"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin
new file mode 100644
index 00000000000..f0a2e7cb643
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json
new file mode 100644
index 00000000000..d6076ae558b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json
@@ -0,0 +1,41 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "mask",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "g"
+ }
+ }, {
+ "literal": {
+ "string": "X"
+ }
+ }, {
+ "literal": {
+ "string": "x"
+ }
+ }, {
+ "literal": {
+ "string": "n"
+ }
+ }, {
+ "literal": {
+ "string": "*"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin
new file mode 100644
index 00000000000..cb5f090361b
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json
new file mode 100644
index 00000000000..c4d92131ed0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json
@@ -0,0 +1,37 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedFunction": {
+ "functionName": "named_struct",
+ "arguments": [{
+ "literal": {
+ "string": "x"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "literal": {
+ "string": "y"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin
new file mode 100644
index 00000000000..b595cfc2820
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index 8e8561920c5..c0de3b233fb 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -35,6 +35,7 @@ Normal Functions
isnan
isnull
monotonically_increasing_id
+ named_struct
nanvl
rand
randn
@@ -202,6 +203,7 @@ Collection Functions
array_position
element_at
array_append
+ array_size
array_sort
array_insert
array_remove
@@ -234,7 +236,10 @@ Collection Functions
from_json
schema_of_json
to_json
+ json_array_length
+ json_object_keys
size
+ cardinality
struct
sort_array
array_max
@@ -279,6 +284,7 @@ Aggregate Functions
approxCountDistinct
approx_count_distinct
approx_percentile
+ array_agg
avg
bit_and
bit_or
@@ -291,6 +297,7 @@ Aggregate Functions
count
count_distinct
countDistinct
+ count_min_sketch
count_if
covar_pop
covar_samp
@@ -402,6 +409,7 @@ String Functions
locate
lpad
ltrim
+ mask
octet_length
parse_url
position
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index 134465a7882..697f969b0b0 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -954,6 +954,13 @@ def collect_list(col: "ColumnOrName") -> Column:
collect_list.__doc__ = pysparkfuncs.collect_list.__doc__
+def array_agg(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("array_agg", col)
+
+
+array_agg.__doc__ = pysparkfuncs.array_agg.__doc__
+
+
def collect_set(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("collect_set", col)
@@ -1025,6 +1032,18 @@ def grouping_id(*cols: "ColumnOrName") -> Column:
grouping_id.__doc__ = pysparkfuncs.grouping_id.__doc__
+def count_min_sketch(
+ col: "ColumnOrName",
+ eps: "ColumnOrName",
+ confidence: "ColumnOrName",
+ seed: "ColumnOrName",
+) -> Column:
+ return _invoke_function_over_columns("count_min_sketch", col, eps, confidence, seed)
+
+
+count_min_sketch.__doc__ = pysparkfuncs.count_min_sketch.__doc__
+
+
def kurtosis(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("kurtosis", col)
@@ -1593,6 +1612,20 @@ def array_min(col: "ColumnOrName") -> Column:
array_min.__doc__ = pysparkfuncs.array_min.__doc__
+def array_size(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("array_size", col)
+
+
+array_size.__doc__ = pysparkfuncs.array_size.__doc__
+
+
+def cardinality(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("cardinality", col)
+
+
+cardinality.__doc__ = pysparkfuncs.cardinality.__doc__
+
+
def array_position(col: "ColumnOrName", value: Any) -> Column:
return _invoke_function("array_position", _to_col(col), lit(value))
@@ -1799,6 +1832,20 @@ def get_json_object(col: "ColumnOrName", path: str) -> Column:
get_json_object.__doc__ = pysparkfuncs.get_json_object.__doc__
+def json_array_length(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("json_array_length", col)
+
+
+json_array_length.__doc__ = pysparkfuncs.json_array_length.__doc__
+
+
+def json_object_keys(col: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("json_object_keys", col)
+
+
+json_object_keys.__doc__ = pysparkfuncs.json_object_keys.__doc__
+
+
def inline(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("inline", col)
@@ -2039,6 +2086,13 @@ def struct(
struct.__doc__ = pysparkfuncs.struct.__doc__
+def named_struct(*cols: "ColumnOrName") -> Column:
+ return _invoke_function_over_columns("named_struct", *cols)
+
+
+named_struct.__doc__ = pysparkfuncs.named_struct.__doc__
+
+
def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
if options is None:
return _invoke_function("to_csv", _to_col(col))
@@ -2667,6 +2721,26 @@ def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
right.__doc__ = pysparkfuncs.right.__doc__
+def mask(
+ col: "ColumnOrName",
+ upperChar: Optional["ColumnOrName"] = None,
+ lowerChar: Optional["ColumnOrName"] = None,
+ digitChar: Optional["ColumnOrName"] = None,
+ otherChar: Optional["ColumnOrName"] = None,
+) -> Column:
+ _upperChar = lit("X") if upperChar is None else upperChar
+ _lowerChar = lit("x") if lowerChar is None else lowerChar
+ _digitChar = lit("n") if digitChar is None else digitChar
+ _otherChar = lit(None) if otherChar is None else otherChar
+
+ return _invoke_function_over_columns(
+ "mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
+ )
+
+
+mask.__doc__ = pysparkfuncs.mask.__doc__
+
+
# Date/Timestamp functions
# TODO(SPARK-41455): Resolve dtypes inconsistencies for:
# to_timestamp, from_utc_timestamp, to_utc_timestamp,
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index c753b59bec5..c28e837759c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3145,6 +3145,32 @@ def collect_list(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("collect_list", col)
+@try_remote_functions
+def array_agg(col: "ColumnOrName") -> Column:
+ """
+ Aggregate function: returns a list of objects with duplicates.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ list of objects with duplicates.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([[1],[1],[2]], ["c"])
+ >>> df.agg(array_agg('c').alias('r')).collect()
+ [Row(r=[1, 1, 2])]
+ """
+ return _invoke_function_over_columns("array_agg", col)
+
+
@try_remote_functions
def collect_set(col: "ColumnOrName") -> Column:
"""
@@ -4069,6 +4095,47 @@ def grouping_id(*cols: "ColumnOrName") -> Column:
return _invoke_function_over_seq_of_columns("grouping_id", cols)
+@try_remote_functions
+def count_min_sketch(
+ col: "ColumnOrName",
+ eps: "ColumnOrName",
+ confidence: "ColumnOrName",
+ seed: "ColumnOrName",
+) -> Column:
+ """
+ Returns a count-min sketch of a column with the given esp, confidence and seed.
+ The result is an array of bytes, which can be deserialized to a `CountMinSketch` before usage.
+ Count-min sketch is a probabilistic data structure used for cardinality estimation
+ using sub-linear space.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+ eps : :class:`~pyspark.sql.Column` or str
+ relative error, must be positive
+ confidence : :class:`~pyspark.sql.Column` or str
+ confidence, must be positive and less than 1.0
+ seed : :class:`~pyspark.sql.Column` or str
+ random seed
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ count-min sketch of the column
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([[1], [2], [1]], ['data'])
+ >>> df = df.agg(count_min_sketch(df.data, lit(0.5), lit(0.5), lit(1)).alias('sketch'))
+ >>> df.select(hex(df.sketch).alias('r')).collect()
+ [Row(r='0000000100000000000000030000000100000004000000005D8D6AB90000000000000000000000000000000200000000000000010000000000000000')]
+ """
+ return _invoke_function_over_columns("count_min_sketch", col, eps, confidence, seed)
+
+
@try_remote_functions
def input_file_name() -> Column:
"""
@@ -4893,6 +4960,31 @@ def struct(
return _invoke_function_over_seq_of_columns("struct", cols) # type: ignore[arg-type]
+@try_remote_functions
+def named_struct(*cols: "ColumnOrName") -> Column:
+ """
+ Creates a struct with the given field names and values.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ cols : :class:`~pyspark.sql.Column` or str
+ list of columns to work on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(1, 2, 3)], ['a', 'b', 'c'])
+ >>> df.select(named_struct(lit('x'), df.a, lit('y'), df.b).alias('r')).collect()
+ [Row(r=Row(x=1, y=2))]
+ """
+ return _invoke_function_over_seq_of_columns("named_struct", cols)
+
+
@try_remote_functions
def greatest(*cols: "ColumnOrName") -> Column:
"""
@@ -10542,6 +10634,61 @@ def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
return _invoke_function_over_columns("right", str, len)
+@try_remote_functions
+def mask(
+ col: "ColumnOrName",
+ upperChar: Optional["ColumnOrName"] = None,
+ lowerChar: Optional["ColumnOrName"] = None,
+ digitChar: Optional["ColumnOrName"] = None,
+ otherChar: Optional["ColumnOrName"] = None,
+) -> Column:
+ """
+ Masks the given string value. This can be useful for creating copies of tables with sensitive
+ information removed.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col: :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+ upperChar: :class:`~pyspark.sql.Column` or str
+ character to replace upper-case characters with. Specify NULL to retain original character.
+ lowerChar: :class:`~pyspark.sql.Column` or str
+ character to replace lower-case characters with. Specify NULL to retain original character.
+ digitChar: :class:`~pyspark.sql.Column` or str
+ character to replace digit characters with. Specify NULL to retain original character.
+ otherChar: :class:`~pyspark.sql.Column` or str
+ character to replace all other characters with. Specify NULL to retain original character.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([("AbCD123-@$#",), ("abcd-EFGH-8765-4321",)], ['data'])
+ >>> df.select(mask(df.data).alias('r')).collect()
+ [Row(r='XxXXnnn-@$#'), Row(r='xxxx-XXXX-nnnn-nnnn')]
+ >>> df.select(mask(df.data, lit('Y')).alias('r')).collect()
+ [Row(r='YxYYnnn-@$#'), Row(r='xxxx-YYYY-nnnn-nnnn')]
+ >>> df.select(mask(df.data, lit('Y'), lit('y')).alias('r')).collect()
+ [Row(r='YyYYnnn-@$#'), Row(r='yyyy-YYYY-nnnn-nnnn')]
+ >>> df.select(mask(df.data, lit('Y'), lit('y'), lit('d')).alias('r')).collect()
+ [Row(r='YyYYddd-@$#'), Row(r='yyyy-YYYY-dddd-dddd')]
+ >>> df.select(mask(df.data, lit('Y'), lit('y'), lit('d'), lit('*')).alias('r')).collect()
+ [Row(r='YyYYddd****'), Row(r='yyyy*YYYY*dddd*dddd')]
+ """
+
+ _upperChar = lit("X") if upperChar is None else upperChar
+ _lowerChar = lit("x") if lowerChar is None else lowerChar
+ _digitChar = lit("n") if digitChar is None else digitChar
+ _otherChar = lit(None) if otherChar is None else otherChar
+ return _invoke_function_over_columns(
+ "mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
+ )
+
+
# ---------------------- Collection functions ------------------------------
@@ -11864,6 +12011,61 @@ def schema_of_json(json: "ColumnOrName", options: Optional[Dict[str, str]] = Non
return _invoke_function("schema_of_json", col, _options_to_str(options))
+@try_remote_functions
+def json_array_length(col: "ColumnOrName") -> Column:
+ """
+ Returns the number of elements in the outermost JSON array. `NULL` is returned in case of
+ any other valid JSON string, `NULL` or an invalid JSON.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col: :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ length of json array.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(None,), ('[1, 2, 3]',), ('[]',)], ['data'])
+ >>> df.select(json_array_length(df.data).alias('r')).collect()
+ [Row(r=None), Row(r=3), Row(r=0)]
+ """
+ return _invoke_function_over_columns("json_array_length", col)
+
+
+@try_remote_functions
+def json_object_keys(col: "ColumnOrName") -> Column:
+ """
+ Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+ given, all the keys of the outermost object will be returned as an array. If it is any
+ other valid JSON string, an invalid JSON string or an empty string, the function returns null.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col: :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ all the keys of the outermost JSON object.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([(None,), ('{}',), ('{"key1":1, "key2":2}',)], ['data'])
+ >>> df.select(json_object_keys(df.data).alias('r')).collect()
+ [Row(r=None), Row(r=[]), Row(r=['key1', 'key2'])]
+ """
+ return _invoke_function_over_columns("json_object_keys", col)
+
+
@try_remote_functions
def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
"""
@@ -12037,6 +12239,58 @@ def array_max(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("array_max", col)
+@try_remote_functions
+def array_size(col: "ColumnOrName") -> Column:
+ """
+ Returns the total number of elements in the array. The function returns null for null input.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ total number of elements in the array.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([([2, 1, 3],), (None,)], ['data'])
+ >>> df.select(array_size(df.data).alias('r')).collect()
+ [Row(r=3), Row(r=None)]
+ """
+ return _invoke_function_over_columns("array_size", col)
+
+
+@try_remote_functions
+def cardinality(col: "ColumnOrName") -> Column:
+ """
+ Collection function: returns the length of the array or map stored in the column.
+
+ .. versionadded:: 3.5.0
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ target column to compute on.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ length of the array/map.
+
+ Examples
+ --------
+ >>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
+ >>> df.select(cardinality(df.data).alias('r')).collect()
+ [Row(r=3), Row(r=1), Row(r=0)]
+ """
+ return _invoke_function_over_columns("cardinality", col)
+
+
@try_remote_functions
def sort_array(col: "ColumnOrName", asc: bool = True) -> Column:
"""
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index b01bf1c6a3c..1feeffea267 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -368,6 +368,23 @@ object functions {
*/
def collect_set(columnName: String): Column = collect_set(Column(columnName))
+ /**
+ * Returns a count-min sketch of a column with the given esp, confidence and seed. The result
+ * is an array of bytes, which can be deserialized to a `CountMinSketch` before usage.
+ * Count-min sketch is a probabilistic data structure used for cardinality estimation using
+ * sub-linear space.
+ *
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def count_min_sketch(
+ e: Column,
+ eps: Column,
+ confidence: Column,
+ seed: Column): Column = withAggregateFunction {
+ new CountMinSketchAgg(e.expr, eps.expr, confidence.expr, seed.expr)
+ }
+
private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =
withAggregateFunction { CollectTopK(e.expr, num, reverse) }
@@ -1624,6 +1641,14 @@ object functions {
@scala.annotation.varargs
def map(cols: Column*): Column = withExpr { CreateMap(cols.map(_.expr)) }
+ /**
+ * Creates a struct with the given field names and values.
+ *
+ * @group normal_funcs
+ * @since 3.5.0
+ */
+ def named_struct(cols: Column*): Column = withExpr { CreateNamedStruct(cols.map(_.expr)) }
+
/**
* Creates a new map column. The array in the first column is used for keys. The array in the
* second column is used for values. All elements in the array for key should not be null.
@@ -6611,6 +6636,29 @@ object functions {
withExpr(SchemaOfJson(json.expr, options.asScala.toMap))
}
+ /**
+ * Returns the number of elements in the outermost JSON array. `NULL` is returned in case of
+ * any other valid JSON string, `NULL` or an invalid JSON.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def json_array_length(jsonArray: Column): Column = withExpr {
+ LengthOfJsonArray(jsonArray.expr)
+ }
+
+ /**
+ * Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+ * given, all the keys of the outermost object will be returned as an array. If it is any
+ * other valid JSON string, an invalid JSON string or an empty string, the function returns null.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def json_object_keys(json: Column): Column = withExpr {
+ JsonObjectKeys(json.expr)
+ }
+
// scalastyle:off line.size.limit
/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
@@ -6671,6 +6719,108 @@ object functions {
def to_json(e: Column): Column =
to_json(e, Map.empty[String, String])
+ /**
+ * Masks the given string value. The function replaces characters with 'X' or 'x', and numbers
+ * with 'n'.
+ * This can be useful for creating copies of tables with sensitive information removed.
+ *
+ * @param input string value to mask. Supported types: STRING, VARCHAR, CHAR
+ *
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column): Column = withExpr {
+ new Mask(input.expr)
+ }
+
+ /**
+ * Masks the given string value. The function replaces upper-case characters with specific
+ * character, lower-case characters with 'x', and numbers with 'n'.
+ * This can be useful for creating copies of tables with sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ *
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column): Column = withExpr {
+ new Mask(input.expr, upperChar.expr)
+ }
+
+ /**
+ * Masks the given string value. The function replaces upper-case and lower-case characters with
+ * the characters specified respectively, and numbers with 'n'.
+ * This can be useful for creating copies of tables with sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ *
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column, lowerChar: Column): Column = withExpr {
+ new Mask(input.expr, upperChar.expr, lowerChar.expr)
+ }
+
+ /**
+ * Masks the given string value. The function replaces upper-case, lower-case characters and
+ * numbers with the characters specified respectively.
+ * This can be useful for creating copies of tables with sensitive information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ * @param digitChar
+ * character to replace digit characters with. Specify NULL to retain original character.
+ *
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(input: Column, upperChar: Column, lowerChar: Column, digitChar: Column): Column = {
+ withExpr {
+ new Mask(input.expr, upperChar.expr, lowerChar.expr, digitChar.expr)
+ }
+ }
+
+ /**
+ * Masks the given string value. This can be useful for creating copies of tables with sensitive
+ * information removed.
+ *
+ * @param input
+ * string value to mask. Supported types: STRING, VARCHAR, CHAR
+ * @param upperChar
+ * character to replace upper-case characters with. Specify NULL to retain original character.
+ * @param lowerChar
+ * character to replace lower-case characters with. Specify NULL to retain original character.
+ * @param digitChar
+ * character to replace digit characters with. Specify NULL to retain original character.
+ * @param otherChar
+ * character to replace all other characters with. Specify NULL to retain original character.
+ *
+ * @group string_funcs
+ * @since 3.5.0
+ */
+ def mask(
+ input: Column,
+ upperChar: Column,
+ lowerChar: Column,
+ digitChar: Column,
+ otherChar: Column): Column = {
+ withExpr {
+ Mask(input.expr, upperChar.expr, lowerChar.expr, digitChar.expr, otherChar.expr)
+ }
+ }
+
/**
* Returns length of array or map.
*
@@ -6683,6 +6833,18 @@ object functions {
*/
def size(e: Column): Column = withExpr { Size(e.expr) }
+ /**
+ * Returns length of array or map. This is an alias of `size` function.
+ *
+ * The function returns null for null input if spark.sql.legacy.sizeOfNull is set to false or
+ * spark.sql.ansi.enabled is set to true. Otherwise, the function returns -1 for null input.
+ * With the default settings, the function returns -1 for null input.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def cardinality(e: Column): Column = size(e)
+
/**
* Sorts the input array for the given column in ascending order,
* according to the natural ordering of the array elements.
@@ -6723,6 +6885,24 @@ object functions {
*/
def array_max(e: Column): Column = withExpr { ArrayMax(e.expr) }
+ /**
+ * Returns the total number of elements in the array. The function returns null for null input.
+ *
+ * @group collection_funcs
+ * @since 3.5.0
+ */
+ def array_size(e: Column): Column = withExpr { ArraySize(e.expr) }
+
+ /**
+ * Aggregate function: returns a list of objects with duplicates.
+ *
+ * @note The function is non-deterministic because the order of collected results depends
+ * on the order of the rows which may be non-deterministic after a shuffle.
+ * @group agg_funcs
+ * @since 3.5.0
+ */
+ def array_agg(e: Column): Column = collect_list(e)
+
/**
* Returns a random permutation of the given array.
*
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
index 5a80bdcdc0f..17f16822aa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions.{count_min_sketch, lit}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.sketch.CountMinSketch
@@ -42,4 +43,22 @@ class CountMinSketchAggQuerySuite extends QueryTest with SharedSparkSession {
assert(sketch == reference)
}
+
+ test("function count_min_sketch") {
+ import testImplicits._
+
+ val eps = 0.1
+ val confidence = 0.95
+ val seed = 11
+
+ val items = Seq(1, 1, 2, 2, 2, 2, 3, 4, 5)
+ val sketch = CountMinSketch.readFrom(items.toDF("id")
+ .select(count_min_sketch($"id", lit(eps), lit(confidence), lit(seed)))
+ .head().get(0).asInstanceOf[Array[Byte]])
+
+ val reference = CountMinSketch.create(eps, confidence, seed)
+ items.foreach(reference.add)
+
+ assert(sketch == reference)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 87fa57a7192..577d86c2d9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -575,6 +575,18 @@ class DataFrameAggregateSuite extends QueryTest
Seq(Set(1, 2, 3) -> Set(2, 4)): _*)
}
+ test("array_agg function") {
+ val df = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b")
+ checkAnswer(
+ df.selectExpr("array_agg(a)", "array_agg(b)"),
+ Seq(Row(Seq(1, 2, 3), Seq(2, 2, 4)))
+ )
+ checkAnswer(
+ df.select(array_agg($"a"), array_agg($"b")),
+ Seq(Row(Seq(1, 2, 3), Seq(2, 2, 4)))
+ )
+ }
+
test("collect functions structs") {
val df = Seq((1, 2, 2), (2, 2, 2), (3, 4, 1))
.toDF("a", "x", "y")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 419e00659ea..e7277451e04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -75,7 +75,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
"udaf", "udf" // create function statement in sql
)
- val excludedSqlFunctions = Set("array_agg", "cardinality")
+ val excludedSqlFunctions = Set.empty[String]
val expectedOnlyDataFrameFunctions = Set(
"bucket", "days", "hours", "months", "years", // Datasource v2 partition transformations
@@ -883,6 +883,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+ checkAnswer(df.select(cardinality($"a")), Seq(Row(2L), Row(0L), Row(3L), Row(sizeOfNull)))
checkAnswer(df.selectExpr("cardinality(a)"), Seq(Row(2L), Row(0L), Row(3L), Row(sizeOfNull)))
}
@@ -1103,6 +1104,8 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+ checkAnswer(df.select(cardinality($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+ checkAnswer(df.selectExpr("cardinality(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
}
test("map size function - legacy") {
@@ -1742,6 +1745,34 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.selectExpr("array_max(a)"), answer)
}
+ test("array_size function") {
+ val df = Seq(
+ Seq[Option[Int]](Some(1), Some(3), Some(2)),
+ Seq.empty[Option[Int]],
+ null,
+ Seq[Option[Int]](None, Some(1))
+ ).toDF("a")
+
+ val answer = Seq(Row(3), Row(0), Row(null), Row(2))
+
+ checkAnswer(df.select(array_size(df("a"))), answer)
+ checkAnswer(df.selectExpr("array_size(a)"), answer)
+ }
+
+ test("cardinality function") {
+ val df = Seq(
+ Seq[Option[Int]](Some(1), Some(3), Some(2)),
+ Seq.empty[Option[Int]],
+ null,
+ Seq[Option[Int]](None, Some(1))
+ ).toDF("a")
+
+ val answer = Seq(Row(3), Row(0), Row(null), Row(2))
+
+ checkAnswer(df.select(array_size(df("a"))), answer)
+ checkAnswer(df.selectExpr("array_size(a)"), answer)
+ }
+
test("sequence") {
checkAnswer(Seq((-2, 2)).toDF().select(sequence($"_1", $"_2")),
Seq(Row(Array(-2, -1, 0, 1, 2))))
@@ -5638,6 +5669,40 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
)
}
+ test("mask function") {
+ val df = Seq("AbCD123-@$#", "abcd-EFGH-8765-4321").toDF("a")
+
+ checkAnswer(df.selectExpr("mask(a)"),
+ Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+ checkAnswer(df.select(mask($"a")),
+ Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+
+ checkAnswer(df.selectExpr("mask(a, 'Y')"),
+ Seq(Row("YxYYnnn-@$#"), Row("xxxx-YYYY-nnnn-nnnn")))
+ checkAnswer(df.select(mask($"a", lit('Y'))),
+ Seq(Row("YxYYnnn-@$#"), Row("xxxx-YYYY-nnnn-nnnn")))
+
+ checkAnswer(df.selectExpr("mask(a, 'Y', 'y')"),
+ Seq(Row("YyYYnnn-@$#"), Row("yyyy-YYYY-nnnn-nnnn")))
+ checkAnswer(df.select(mask($"a", lit('Y'), lit('y'))),
+ Seq(Row("YyYYnnn-@$#"), Row("yyyy-YYYY-nnnn-nnnn")))
+
+ checkAnswer(df.selectExpr("mask(a, 'Y', 'y', 'd')"),
+ Seq(Row("YyYYddd-@$#"), Row("yyyy-YYYY-dddd-dddd")))
+ checkAnswer(df.select(mask($"a", lit('Y'), lit('y'), lit('d'))),
+ Seq(Row("YyYYddd-@$#"), Row("yyyy-YYYY-dddd-dddd")))
+
+ checkAnswer(df.selectExpr("mask(a, 'X', 'x', 'n', null)"),
+ Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+ checkAnswer(df.select(mask($"a", lit('X'), lit('x'), lit('n'), lit(null))),
+ Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+
+ checkAnswer(df.selectExpr("mask(a, null, null, null, '*')"),
+ Seq(Row("AbCD123****"), Row("abcd*EFGH*8765*4321")))
+ checkAnswer(df.select(mask($"a", lit(null), lit(null), lit(null), lit('*'))),
+ Seq(Row("AbCD123****"), Row("abcd*EFGH*8765*4321")))
+ }
+
test("test array_compact") {
val df = Seq(
(Array[Integer](null, 1, 2, null, 3, 4),
@@ -5818,6 +5883,24 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.selectExpr("CURRENT_USER()"), df.select(current_user()))
checkAnswer(df.selectExpr("USER()"), df.select(user()))
}
+
+ test("named_struct function") {
+ val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+ val expectedSchema = StructType(
+ StructField(
+ "value",
+ StructType(StructField("x", IntegerType, false) ::
+ StructField("y", IntegerType, false) :: Nil),
+ false) :: Nil)
+ val df1 = df.selectExpr("named_struct('x', a, 'y', b) value")
+ val df2 = df.select(named_struct(lit("x"), $"a", lit("y"), $"b")).toDF("value")
+
+ checkAnswer(df1, Seq(Row(Row(1, 2))))
+ assert(df1.schema === expectedSchema)
+
+ checkAnswer(df2, Seq(Row(Row(1, 2))))
+ assert(df2.schema === expectedSchema)
+ }
}
object DataFrameFunctionsSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index d2ffea07921..187fab75f63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -1370,4 +1370,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
)
)
}
+
+ test("json_array_length function") {
+ val df = Seq(null, "[]", "[1, 2, 3]", "{\"key\": 1}", "invalid json")
+ .toDF("a")
+
+ val expected = Seq(Row(null), Row(0), Row(3), Row(null), Row(null))
+
+ checkAnswer(df.selectExpr("json_array_length(a)"), expected)
+ checkAnswer(df.select(json_array_length($"a")), expected)
+ }
+
+ test("json_object_keys function") {
+ val df = Seq(null, "{}", "{\"key1\":1, \"key2\": 2}", "[1, 2, 3]", "invalid json")
+ .toDF("a")
+
+ val expected = Seq(Row(null), Row(Seq.empty),
+ Row(Seq("key1", "key2")), Row(null), Row(null))
+
+ checkAnswer(df.selectExpr("json_object_keys(a)"), expected)
+ checkAnswer(df.select(json_object_keys($"a")), expected)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org