You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/30 05:36:15 UTC

[spark] branch master updated: [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new de8ec74f282 [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
de8ec74f282 is described below

commit de8ec74f2826db3815275b4fccef186f22c85833
Author: Tengfei Huang <te...@gmail.com>
AuthorDate: Thu Jun 29 22:35:31 2023 -0700

    [SPARK-43926][CONNECT][PYTHON] Add array_agg, array_size, cardinality, count_min_sketch,mask,named_struct,json_* to Scala and Python
    
    ### What changes were proposed in this pull request?
    
    Add following functions:
    - array_agg
    - array_size
    - cardinality
    - count_min_sketch
    - named_struct
    - json_array_length
    - json_object_keys
    - mask
    
    To:
    
    - Scala API
    - Python API
    - Spark Connect Scala Client
    - Spark Connect Python Client
    
    ### Why are the changes needed?
    Add Scala, Python and Connect API for these sql functions: array_agg, array_size, cardinality, count_min_sketch, named_struct, json_array_length, json_object_keys, mask
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, added new functions.
    
    ### How was this patch tested?
    New UT added.
    
    Closes #41718 from ivoson/SPARK-43926.
    
    Lead-authored-by: Tengfei Huang <te...@gmail.com>
    Co-authored-by: Tengfei Huang <te...@microsoft.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     | 159 +++++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  48 ++++
 .../explain-results/function_array_agg.explain     |   2 +
 .../explain-results/function_array_size.explain    |   2 +
 .../explain-results/function_cardinality.explain   |   2 +
 .../function_count_min_sketch.explain              |   2 +
 .../function_json_array_length.explain             |   2 +
 .../function_json_object_keys.explain              |   2 +
 .../explain-results/function_mask.explain          |   2 +
 .../function_mask_with_specific_upperChar.explain  |   2 +
 ..._mask_with_specific_upperChar_lowerChar.explain |   2 +
 ..._specific_upperChar_lowerChar_digitChar.explain |   2 +
 ...upperChar_lowerChar_digitChar_otherChar.explain |   2 +
 .../explain-results/function_named_struct.explain  |   2 +
 .../query-tests/queries/function_array_agg.json    |  25 ++
 .../queries/function_array_agg.proto.bin           | Bin 0 -> 178 bytes
 .../query-tests/queries/function_array_size.json   |  25 ++
 .../queries/function_array_size.proto.bin          | Bin 0 -> 179 bytes
 .../query-tests/queries/function_cardinality.json  |  25 ++
 .../queries/function_cardinality.proto.bin         | Bin 0 -> 180 bytes
 .../queries/function_count_min_sketch.json         |  37 +++
 .../queries/function_count_min_sketch.proto.bin    | Bin 0 -> 217 bytes
 .../queries/function_json_array_length.json        |  25 ++
 .../queries/function_json_array_length.proto.bin   | Bin 0 -> 186 bytes
 .../queries/function_json_object_keys.json         |  25 ++
 .../queries/function_json_object_keys.proto.bin    | Bin 0 -> 185 bytes
 .../query-tests/queries/function_mask.json         |  25 ++
 .../query-tests/queries/function_mask.proto.bin    | Bin 0 -> 173 bytes
 .../function_mask_with_specific_upperChar.json     |  29 +++
 ...function_mask_with_specific_upperChar.proto.bin | Bin 0 -> 180 bytes
 ...ion_mask_with_specific_upperChar_lowerChar.json |  33 +++
 ...ask_with_specific_upperChar_lowerChar.proto.bin | Bin 0 -> 187 bytes
 ...ith_specific_upperChar_lowerChar_digitChar.json |  37 +++
 ...pecific_upperChar_lowerChar_digitChar.proto.bin | Bin 0 -> 194 bytes
 ...ic_upperChar_lowerChar_digitChar_otherChar.json |  41 ++++
 ...perChar_lowerChar_digitChar_otherChar.proto.bin | Bin 0 -> 201 bytes
 .../query-tests/queries/function_named_struct.json |  37 +++
 .../queries/function_named_struct.proto.bin        | Bin 0 -> 203 bytes
 .../source/reference/pyspark.sql/functions.rst     |   8 +
 python/pyspark/sql/connect/functions.py            |  74 ++++++
 python/pyspark/sql/functions.py                    | 254 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     | 180 +++++++++++++++
 .../spark/sql/CountMinSketchAggQuerySuite.scala    |  19 ++
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  12 +
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |  85 ++++++-
 .../org/apache/spark/sql/JsonFunctionsSuite.scala  |  21 ++
 46 files changed, 1247 insertions(+), 1 deletion(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index ed0c13b2145..2c1a966dc71 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -137,6 +137,15 @@ object functions {
     case s: Symbol => new Column(s.name)
     case _ => createLiteral(create(literal))
   }
+
+  /**
+   * Creates a struct with the given field names and values.
+   *
+   * @group normal_funcs
+   * @since 3.5.0
+   */
+  def named_struct(cols: Column*): Column = Column.fn("named_struct", cols: _*)
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Sort functions
   //////////////////////////////////////////////////////////////////////////////////////////////
@@ -1275,6 +1284,29 @@ object functions {
    */
   def bit_xor(e: Column): Column = Column.fn("bit_xor", e)
 
+  /**
+   * Aggregate function: returns a list of objects with duplicates.
+   *
+   * @note
+   *   The function is non-deterministic because the order of collected results depends on the
+   *   order of the rows which may be non-deterministic after a shuffle.
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def array_agg(e: Column): Column = Column.fn("array_agg", e)
+
+  /**
+   * Returns a count-min sketch of a column with the given esp, confidence and seed. The result is
+   * an array of bytes, which can be deserialized to a `CountMinSketch` before usage. Count-min
+   * sketch is a probabilistic data structure used for cardinality estimation using sub-linear
+   * space.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def count_min_sketch(e: Column, eps: Column, confidence: Column, seed: Column): Column =
+    Column.fn("count_min_sketch", e, eps, confidence, seed)
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions
   //////////////////////////////////////////////////////////////////////////////////////////////
@@ -4454,6 +4486,94 @@ object functions {
    */
   def right(str: Column, len: Column): Column = Column.fn("right", str, len)
 
+  /**
+   * Masks the given string value. The function replaces characters with 'X' or 'x', and numbers
+   * with 'n'. This can be useful for creating copies of tables with sensitive information
+   * removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column): Column = Column.fn("mask", input)
+
+  /**
+   * Masks the given string value. The function replaces upper-case characters with specific
+   * character, lower-case characters with 'x', and numbers with 'n'. This can be useful for
+   * creating copies of tables with sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column): Column =
+    Column.fn("mask", input, upperChar)
+
+  /**
+   * Masks the given string value. The function replaces upper-case, lower-case characters with
+   * specific characters, and numbers with 'n'. This can be useful for creating copies of tables
+   * with sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column, lowerChar: Column): Column =
+    Column.fn("mask", input, upperChar, lowerChar)
+
+  /**
+   * Masks the given string value. The function replaces upper-case, lower-case characters and
+   * numbers with specific characters. This can be useful for creating copies of tables with
+   * sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   * @param digitChar
+   *   character to replace digit characters with. Specify NULL to retain original character.
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column, lowerChar: Column, digitChar: Column): Column =
+    Column.fn("mask", input, upperChar, lowerChar, digitChar)
+
+  /**
+   * Masks the given string value. This can be useful for creating copies of tables with sensitive
+   * information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   * @param digitChar
+   *   character to replace digit characters with. Specify NULL to retain original character.
+   * @param otherChar
+   *   character to replace all other characters with. Specify NULL to retain original character.
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(
+      input: Column,
+      upperChar: Column,
+      lowerChar: Column,
+      digitChar: Column,
+      otherChar: Column): Column =
+    Column.fn("mask", input, upperChar, lowerChar, digitChar, otherChar)
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // DateTime functions
   //////////////////////////////////////////////////////////////////////////////////////////////
@@ -6927,6 +7047,45 @@ object functions {
    */
   def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
 
+  /**
+   * Returns the total number of elements in the array. The function returns null for null input.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def array_size(e: Column): Column = Column.fn("array_size", e)
+
+  /**
+   * Returns length of array or map. This is an alias of `size` function.
+   *
+   * The function returns null for null input if spark.sql.legacy.sizeOfNull is set to false or
+   * spark.sql.ansi.enabled is set to true. Otherwise, the function returns -1 for null input.
+   * With the default settings, the function returns -1 for null input.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def cardinality(e: Column): Column = Column.fn("cardinality", e)
+
+  /**
+   * Returns the number of elements in the outermost JSON array. `NULL` is returned in case of any
+   * other valid JSON string, `NULL` or an invalid JSON.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def json_array_length(e: Column): Column = Column.fn("json_array_length", e)
+
+  /**
+   * Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+   * given, all the keys of the outermost object will be returned as an array. If it is any other
+   * valid JSON string, an invalid JSON string or an empty string, the function returns null.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def json_object_keys(e: Column): Column = Column.fn("json_object_keys", e)
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Partition Transforms functions
   //////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index ecb7092b8d9..875f10872a2 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2698,6 +2698,54 @@ class PlanGenerationTestSuite
     fn.right(fn.col("g"), fn.col("g"))
   }
 
+  functionTest("array_agg") {
+    fn.array_agg(fn.col("a"))
+  }
+
+  functionTest("array_size") {
+    fn.array_size(fn.col("e"))
+  }
+
+  functionTest("cardinality") {
+    fn.cardinality(fn.col("f"))
+  }
+
+  functionTest("count_min_sketch") {
+    fn.count_min_sketch(fn.col("a"), fn.lit(0.1), fn.lit(0.95), fn.lit(11))
+  }
+
+  functionTest("named_struct") {
+    fn.named_struct(fn.lit("x"), fn.col("a"), fn.lit("y"), fn.col("id"))
+  }
+
+  functionTest("json_array_length") {
+    fn.json_array_length(fn.col("g"))
+  }
+
+  functionTest("json_object_keys") {
+    fn.json_object_keys(fn.col("g"))
+  }
+
+  functionTest("mask with specific upperChar lowerChar digitChar otherChar") {
+    fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'), fn.lit('n'), fn.lit('*'))
+  }
+
+  functionTest("mask with specific upperChar lowerChar digitChar") {
+    fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'), fn.lit('n'))
+  }
+
+  functionTest("mask with specific upperChar lowerChar") {
+    fn.mask(fn.col("g"), fn.lit('X'), fn.lit('x'))
+  }
+
+  functionTest("mask with specific upperChar") {
+    fn.mask(fn.col("g"), fn.lit('X'))
+  }
+
+  functionTest("mask") {
+    fn.mask(fn.col("g"))
+  }
+
   functionTest("aes_encrypt with mode padding iv aad") {
     fn.aes_encrypt(
       fn.col("g"),
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain
new file mode 100644
index 00000000000..102f736c62e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_agg.explain
@@ -0,0 +1,2 @@
+Aggregate [collect_list(a#0, 0, 0) AS collect_list(a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain
new file mode 100644
index 00000000000..faf923d68d6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_size.explain
@@ -0,0 +1,2 @@
+Project [size(e#0, false) AS array_size(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain
new file mode 100644
index 00000000000..3392c67cda6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_cardinality.explain
@@ -0,0 +1,2 @@
+Project [cardinality(f#0, true) AS cardinality(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain
new file mode 100644
index 00000000000..4baee060c91
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_count_min_sketch.explain
@@ -0,0 +1,2 @@
+Aggregate [count_min_sketch(a#0, 0.1, 0.95, 11, 0, 0) AS count_min_sketch(a, 0.1, 0.95, 11)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain
new file mode 100644
index 00000000000..50ab91560e6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_array_length.explain
@@ -0,0 +1,2 @@
+Project [json_array_length(g#0) AS json_array_length(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain
new file mode 100644
index 00000000000..30153bb192e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_object_keys.explain
@@ -0,0 +1,2 @@
+Project [json_object_keys(g#0) AS json_object_keys(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain
new file mode 100644
index 00000000000..bd2f345656e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, null) AS mask(g, X, x, n, NULL)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain
new file mode 100644
index 00000000000..9f3f378f0ab
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.explain
@@ -0,0 +1,2 @@
+Project [mask(g#0, X, x, n, *) AS mask(g, X, x, n, *)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain
new file mode 100644
index 00000000000..bafbd5a650f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_named_struct.explain
@@ -0,0 +1,2 @@
+Project [named_struct(x, a#0, y, id#0L) AS named_struct(x, a, y, id)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json
new file mode 100644
index 00000000000..a3197ce9506
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_agg",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin
new file mode 100644
index 00000000000..c7306df8621
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_array_agg.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json
new file mode 100644
index 00000000000..c1c618bc7f1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_size",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin
new file mode 100644
index 00000000000..47949dfbbda
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_array_size.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json
new file mode 100644
index 00000000000..e2b3dd04287
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "cardinality",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin
new file mode 100644
index 00000000000..54c8cfe8434
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_cardinality.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json
new file mode 100644
index 00000000000..94be79dcc33
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "count_min_sketch",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "literal": {
+            "double": 0.1
+          }
+        }, {
+          "literal": {
+            "double": 0.95
+          }
+        }, {
+          "literal": {
+            "integer": 11
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin
new file mode 100644
index 00000000000..11bcae8062e
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_count_min_sketch.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json
new file mode 100644
index 00000000000..36223a451e3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "json_array_length",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin
new file mode 100644
index 00000000000..817c803d830
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_json_array_length.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json
new file mode 100644
index 00000000000..f8667a1012a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "json_object_keys",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin
new file mode 100644
index 00000000000..4be9477ec91
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_json_object_keys.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json
new file mode 100644
index 00000000000..c0473466a3e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "mask",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin
new file mode 100644
index 00000000000..5e94c267593
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json
new file mode 100644
index 00000000000..571d514e72d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "mask",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "X"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin
new file mode 100644
index 00000000000..0f6c4b579c4
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json
new file mode 100644
index 00000000000..ae527d70cf1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "mask",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "X"
+          }
+        }, {
+          "literal": {
+            "string": "x"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin
new file mode 100644
index 00000000000..5a6b4d7caa6
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json
new file mode 100644
index 00000000000..e7fee11d316
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "mask",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "X"
+          }
+        }, {
+          "literal": {
+            "string": "x"
+          }
+        }, {
+          "literal": {
+            "string": "n"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin
new file mode 100644
index 00000000000..f0a2e7cb643
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json
new file mode 100644
index 00000000000..d6076ae558b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.json
@@ -0,0 +1,41 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "mask",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "X"
+          }
+        }, {
+          "literal": {
+            "string": "x"
+          }
+        }, {
+          "literal": {
+            "string": "n"
+          }
+        }, {
+          "literal": {
+            "string": "*"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin
new file mode 100644
index 00000000000..cb5f090361b
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_mask_with_specific_upperChar_lowerChar_digitChar_otherChar.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json
new file mode 100644
index 00000000000..c4d92131ed0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "named_struct",
+        "arguments": [{
+          "literal": {
+            "string": "x"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "literal": {
+            "string": "y"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin
new file mode 100644
index 00000000000..b595cfc2820
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_named_struct.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index 8e8561920c5..c0de3b233fb 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -35,6 +35,7 @@ Normal Functions
     isnan
     isnull
     monotonically_increasing_id
+    named_struct
     nanvl
     rand
     randn
@@ -202,6 +203,7 @@ Collection Functions
     array_position
     element_at
     array_append
+    array_size
     array_sort
     array_insert
     array_remove
@@ -234,7 +236,10 @@ Collection Functions
     from_json
     schema_of_json
     to_json
+    json_array_length
+    json_object_keys
     size
+    cardinality
     struct
     sort_array
     array_max
@@ -279,6 +284,7 @@ Aggregate Functions
     approxCountDistinct
     approx_count_distinct
     approx_percentile
+    array_agg
     avg
     bit_and
     bit_or
@@ -291,6 +297,7 @@ Aggregate Functions
     count
     count_distinct
     countDistinct
+    count_min_sketch
     count_if
     covar_pop
     covar_samp
@@ -402,6 +409,7 @@ String Functions
     locate
     lpad
     ltrim
+    mask
     octet_length
     parse_url
     position
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index 134465a7882..697f969b0b0 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -954,6 +954,13 @@ def collect_list(col: "ColumnOrName") -> Column:
 collect_list.__doc__ = pysparkfuncs.collect_list.__doc__
 
 
+def array_agg(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("array_agg", col)
+
+
+array_agg.__doc__ = pysparkfuncs.array_agg.__doc__
+
+
 def collect_set(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("collect_set", col)
 
@@ -1025,6 +1032,18 @@ def grouping_id(*cols: "ColumnOrName") -> Column:
 grouping_id.__doc__ = pysparkfuncs.grouping_id.__doc__
 
 
+def count_min_sketch(
+    col: "ColumnOrName",
+    eps: "ColumnOrName",
+    confidence: "ColumnOrName",
+    seed: "ColumnOrName",
+) -> Column:
+    return _invoke_function_over_columns("count_min_sketch", col, eps, confidence, seed)
+
+
+count_min_sketch.__doc__ = pysparkfuncs.count_min_sketch.__doc__
+
+
 def kurtosis(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("kurtosis", col)
 
@@ -1593,6 +1612,20 @@ def array_min(col: "ColumnOrName") -> Column:
 array_min.__doc__ = pysparkfuncs.array_min.__doc__
 
 
+def array_size(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("array_size", col)
+
+
+array_size.__doc__ = pysparkfuncs.array_size.__doc__
+
+
+def cardinality(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("cardinality", col)
+
+
+cardinality.__doc__ = pysparkfuncs.cardinality.__doc__
+
+
 def array_position(col: "ColumnOrName", value: Any) -> Column:
     return _invoke_function("array_position", _to_col(col), lit(value))
 
@@ -1799,6 +1832,20 @@ def get_json_object(col: "ColumnOrName", path: str) -> Column:
 get_json_object.__doc__ = pysparkfuncs.get_json_object.__doc__
 
 
+def json_array_length(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("json_array_length", col)
+
+
+json_array_length.__doc__ = pysparkfuncs.json_array_length.__doc__
+
+
+def json_object_keys(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("json_object_keys", col)
+
+
+json_object_keys.__doc__ = pysparkfuncs.json_object_keys.__doc__
+
+
 def inline(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("inline", col)
 
@@ -2039,6 +2086,13 @@ def struct(
 struct.__doc__ = pysparkfuncs.struct.__doc__
 
 
+def named_struct(*cols: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("named_struct", *cols)
+
+
+named_struct.__doc__ = pysparkfuncs.named_struct.__doc__
+
+
 def to_csv(col: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
     if options is None:
         return _invoke_function("to_csv", _to_col(col))
@@ -2667,6 +2721,26 @@ def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
 right.__doc__ = pysparkfuncs.right.__doc__
 
 
+def mask(
+    col: "ColumnOrName",
+    upperChar: Optional["ColumnOrName"] = None,
+    lowerChar: Optional["ColumnOrName"] = None,
+    digitChar: Optional["ColumnOrName"] = None,
+    otherChar: Optional["ColumnOrName"] = None,
+) -> Column:
+    _upperChar = lit("X") if upperChar is None else upperChar
+    _lowerChar = lit("x") if lowerChar is None else lowerChar
+    _digitChar = lit("n") if digitChar is None else digitChar
+    _otherChar = lit(None) if otherChar is None else otherChar
+
+    return _invoke_function_over_columns(
+        "mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
+    )
+
+
+mask.__doc__ = pysparkfuncs.mask.__doc__
+
+
 # Date/Timestamp functions
 # TODO(SPARK-41455): Resolve dtypes inconsistencies for:
 #     to_timestamp, from_utc_timestamp, to_utc_timestamp,
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index c753b59bec5..c28e837759c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -3145,6 +3145,32 @@ def collect_list(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("collect_list", col)
 
 
+@try_remote_functions
+def array_agg(col: "ColumnOrName") -> Column:
+    """
+    Aggregate function: returns a list of objects with duplicates.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        list of objects with duplicates.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([[1],[1],[2]], ["c"])
+    >>> df.agg(array_agg('c').alias('r')).collect()
+    [Row(r=[1, 1, 2])]
+    """
+    return _invoke_function_over_columns("array_agg", col)
+
+
 @try_remote_functions
 def collect_set(col: "ColumnOrName") -> Column:
     """
@@ -4069,6 +4095,47 @@ def grouping_id(*cols: "ColumnOrName") -> Column:
     return _invoke_function_over_seq_of_columns("grouping_id", cols)
 
 
+@try_remote_functions
+def count_min_sketch(
+    col: "ColumnOrName",
+    eps: "ColumnOrName",
+    confidence: "ColumnOrName",
+    seed: "ColumnOrName",
+) -> Column:
+    """
+    Returns a count-min sketch of a column with the given esp, confidence and seed.
+    The result is an array of bytes, which can be deserialized to a `CountMinSketch` before usage.
+    Count-min sketch is a probabilistic data structure used for cardinality estimation
+    using sub-linear space.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+    eps : :class:`~pyspark.sql.Column` or str
+        relative error, must be positive
+    confidence : :class:`~pyspark.sql.Column` or str
+        confidence, must be positive and less than 1.0
+    seed : :class:`~pyspark.sql.Column` or str
+        random seed
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        count-min sketch of the column
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([[1], [2], [1]], ['data'])
+    >>> df = df.agg(count_min_sketch(df.data, lit(0.5), lit(0.5), lit(1)).alias('sketch'))
+    >>> df.select(hex(df.sketch).alias('r')).collect()
+    [Row(r='0000000100000000000000030000000100000004000000005D8D6AB90000000000000000000000000000000200000000000000010000000000000000')]
+    """
+    return _invoke_function_over_columns("count_min_sketch", col, eps, confidence, seed)
+
+
 @try_remote_functions
 def input_file_name() -> Column:
     """
@@ -4893,6 +4960,31 @@ def struct(
     return _invoke_function_over_seq_of_columns("struct", cols)  # type: ignore[arg-type]
 
 
+@try_remote_functions
+def named_struct(*cols: "ColumnOrName") -> Column:
+    """
+    Creates a struct with the given field names and values.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        list of columns to work on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, 2, 3)], ['a', 'b', 'c'])
+    >>> df.select(named_struct(lit('x'), df.a, lit('y'), df.b).alias('r')).collect()
+    [Row(r=Row(x=1, y=2))]
+    """
+    return _invoke_function_over_seq_of_columns("named_struct", cols)
+
+
 @try_remote_functions
 def greatest(*cols: "ColumnOrName") -> Column:
     """
@@ -10542,6 +10634,61 @@ def right(str: "ColumnOrName", len: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("right", str, len)
 
 
+@try_remote_functions
+def mask(
+    col: "ColumnOrName",
+    upperChar: Optional["ColumnOrName"] = None,
+    lowerChar: Optional["ColumnOrName"] = None,
+    digitChar: Optional["ColumnOrName"] = None,
+    otherChar: Optional["ColumnOrName"] = None,
+) -> Column:
+    """
+    Masks the given string value. This can be useful for creating copies of tables with sensitive
+    information removed.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col: :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+    upperChar: :class:`~pyspark.sql.Column` or str
+        character to replace upper-case characters with. Specify NULL to retain original character.
+    lowerChar: :class:`~pyspark.sql.Column` or str
+        character to replace lower-case characters with. Specify NULL to retain original character.
+    digitChar: :class:`~pyspark.sql.Column` or str
+        character to replace digit characters with. Specify NULL to retain original character.
+    otherChar: :class:`~pyspark.sql.Column` or str
+        character to replace all other characters with. Specify NULL to retain original character.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("AbCD123-@$#",), ("abcd-EFGH-8765-4321",)], ['data'])
+    >>> df.select(mask(df.data).alias('r')).collect()
+    [Row(r='XxXXnnn-@$#'), Row(r='xxxx-XXXX-nnnn-nnnn')]
+    >>> df.select(mask(df.data, lit('Y')).alias('r')).collect()
+    [Row(r='YxYYnnn-@$#'), Row(r='xxxx-YYYY-nnnn-nnnn')]
+    >>> df.select(mask(df.data, lit('Y'), lit('y')).alias('r')).collect()
+    [Row(r='YyYYnnn-@$#'), Row(r='yyyy-YYYY-nnnn-nnnn')]
+    >>> df.select(mask(df.data, lit('Y'), lit('y'), lit('d')).alias('r')).collect()
+    [Row(r='YyYYddd-@$#'), Row(r='yyyy-YYYY-dddd-dddd')]
+    >>> df.select(mask(df.data, lit('Y'), lit('y'), lit('d'), lit('*')).alias('r')).collect()
+    [Row(r='YyYYddd****'), Row(r='yyyy*YYYY*dddd*dddd')]
+    """
+
+    _upperChar = lit("X") if upperChar is None else upperChar
+    _lowerChar = lit("x") if lowerChar is None else lowerChar
+    _digitChar = lit("n") if digitChar is None else digitChar
+    _otherChar = lit(None) if otherChar is None else otherChar
+    return _invoke_function_over_columns(
+        "mask", col, _upperChar, _lowerChar, _digitChar, _otherChar
+    )
+
+
 # ---------------------- Collection functions ------------------------------
 
 
@@ -11864,6 +12011,61 @@ def schema_of_json(json: "ColumnOrName", options: Optional[Dict[str, str]] = Non
     return _invoke_function("schema_of_json", col, _options_to_str(options))
 
 
+@try_remote_functions
+def json_array_length(col: "ColumnOrName") -> Column:
+    """
+    Returns the number of elements in the outermost JSON array. `NULL` is returned in case of
+    any other valid JSON string, `NULL` or an invalid JSON.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col: :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        length of json array.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(None,), ('[1, 2, 3]',), ('[]',)], ['data'])
+    >>> df.select(json_array_length(df.data).alias('r')).collect()
+    [Row(r=None), Row(r=3), Row(r=0)]
+    """
+    return _invoke_function_over_columns("json_array_length", col)
+
+
+@try_remote_functions
+def json_object_keys(col: "ColumnOrName") -> Column:
+    """
+    Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+    given, all the keys of the outermost object will be returned as an array. If it is any
+    other valid JSON string, an invalid JSON string or an empty string, the function returns null.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col: :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        all the keys of the outermost JSON object.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(None,), ('{}',), ('{"key1":1, "key2":2}',)], ['data'])
+    >>> df.select(json_object_keys(df.data).alias('r')).collect()
+    [Row(r=None), Row(r=[]), Row(r=['key1', 'key2'])]
+    """
+    return _invoke_function_over_columns("json_object_keys", col)
+
+
 @try_remote_functions
 def schema_of_csv(csv: "ColumnOrName", options: Optional[Dict[str, str]] = None) -> Column:
     """
@@ -12037,6 +12239,58 @@ def array_max(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("array_max", col)
 
 
+@try_remote_functions
+def array_size(col: "ColumnOrName") -> Column:
+    """
+    Returns the total number of elements in the array. The function returns null for null input.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        total number of elements in the array.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([([2, 1, 3],), (None,)], ['data'])
+    >>> df.select(array_size(df.data).alias('r')).collect()
+    [Row(r=3), Row(r=None)]
+    """
+    return _invoke_function_over_columns("array_size", col)
+
+
+@try_remote_functions
+def cardinality(col: "ColumnOrName") -> Column:
+    """
+    Collection function: returns the length of the array or map stored in the column.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        target column to compute on.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        length of the array/map.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
+    >>> df.select(cardinality(df.data).alias('r')).collect()
+    [Row(r=3), Row(r=1), Row(r=0)]
+    """
+    return _invoke_function_over_columns("cardinality", col)
+
+
 @try_remote_functions
 def sort_array(col: "ColumnOrName", asc: bool = True) -> Column:
     """
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index b01bf1c6a3c..1feeffea267 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -368,6 +368,23 @@ object functions {
    */
   def collect_set(columnName: String): Column = collect_set(Column(columnName))
 
+  /**
+   * Returns a count-min sketch of a column with the given esp, confidence and seed. The result
+   * is an array of bytes, which can be deserialized to a `CountMinSketch` before usage.
+   * Count-min sketch is a probabilistic data structure used for cardinality estimation using
+   * sub-linear space.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def count_min_sketch(
+    e: Column,
+    eps: Column,
+    confidence: Column,
+    seed: Column): Column = withAggregateFunction {
+    new CountMinSketchAgg(e.expr, eps.expr, confidence.expr, seed.expr)
+  }
+
   private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =
     withAggregateFunction { CollectTopK(e.expr, num, reverse) }
 
@@ -1624,6 +1641,14 @@ object functions {
   @scala.annotation.varargs
   def map(cols: Column*): Column = withExpr { CreateMap(cols.map(_.expr)) }
 
+  /**
+   * Creates a struct with the given field names and values.
+   *
+   * @group normal_funcs
+   * @since 3.5.0
+   */
+  def named_struct(cols: Column*): Column = withExpr { CreateNamedStruct(cols.map(_.expr)) }
+
   /**
    * Creates a new map column. The array in the first column is used for keys. The array in the
    * second column is used for values. All elements in the array for key should not be null.
@@ -6611,6 +6636,29 @@ object functions {
     withExpr(SchemaOfJson(json.expr, options.asScala.toMap))
   }
 
+  /**
+   * Returns the number of elements in the outermost JSON array. `NULL` is returned in case of
+   * any other valid JSON string, `NULL` or an invalid JSON.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def json_array_length(jsonArray: Column): Column = withExpr {
+    LengthOfJsonArray(jsonArray.expr)
+  }
+
+  /**
+   * Returns all the keys of the outermost JSON object as an array. If a valid JSON object is
+   * given, all the keys of the outermost object will be returned as an array. If it is any
+   * other valid JSON string, an invalid JSON string or an empty string, the function returns null.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def json_object_keys(json: Column): Column = withExpr {
+    JsonObjectKeys(json.expr)
+  }
+
   // scalastyle:off line.size.limit
   /**
    * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
@@ -6671,6 +6719,108 @@ object functions {
   def to_json(e: Column): Column =
     to_json(e, Map.empty[String, String])
 
+  /**
+   * Masks the given string value. The function replaces characters with 'X' or 'x', and numbers
+   * with 'n'.
+   * This can be useful for creating copies of tables with sensitive information removed.
+   *
+   * @param input string value to mask. Supported types: STRING, VARCHAR, CHAR
+   *
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column): Column = withExpr {
+    new Mask(input.expr)
+  }
+
+  /**
+   * Masks the given string value. The function replaces upper-case characters with specific
+   * character, lower-case characters with 'x', and numbers with 'n'.
+   * This can be useful for creating copies of tables with sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   *
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column): Column = withExpr {
+    new Mask(input.expr, upperChar.expr)
+  }
+
+  /**
+   * Masks the given string value. The function replaces upper-case and lower-case characters with
+   * the characters specified respectively, and numbers with 'n'.
+   * This can be useful for creating copies of tables with sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   *
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column, lowerChar: Column): Column = withExpr {
+    new Mask(input.expr, upperChar.expr, lowerChar.expr)
+  }
+
+  /**
+   * Masks the given string value. The function replaces upper-case, lower-case characters and
+   * numbers with the characters specified respectively.
+   * This can be useful for creating copies of tables with sensitive information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   * @param digitChar
+   *   character to replace digit characters with. Specify NULL to retain original character.
+   *
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(input: Column, upperChar: Column, lowerChar: Column, digitChar: Column): Column = {
+    withExpr {
+      new Mask(input.expr, upperChar.expr, lowerChar.expr, digitChar.expr)
+    }
+  }
+
+  /**
+   * Masks the given string value. This can be useful for creating copies of tables with sensitive
+   * information removed.
+   *
+   * @param input
+   *   string value to mask. Supported types: STRING, VARCHAR, CHAR
+   * @param upperChar
+   *   character to replace upper-case characters with. Specify NULL to retain original character.
+   * @param lowerChar
+   *   character to replace lower-case characters with. Specify NULL to retain original character.
+   * @param digitChar
+   *   character to replace digit characters with. Specify NULL to retain original character.
+   * @param otherChar
+   *   character to replace all other characters with. Specify NULL to retain original character.
+   *
+   * @group string_funcs
+   * @since 3.5.0
+   */
+  def mask(
+    input: Column,
+    upperChar: Column,
+    lowerChar: Column,
+    digitChar: Column,
+    otherChar: Column): Column = {
+    withExpr {
+      Mask(input.expr, upperChar.expr, lowerChar.expr, digitChar.expr, otherChar.expr)
+    }
+  }
+
   /**
    * Returns length of array or map.
    *
@@ -6683,6 +6833,18 @@ object functions {
    */
   def size(e: Column): Column = withExpr { Size(e.expr) }
 
+  /**
+   * Returns length of array or map. This is an alias of `size` function.
+   *
+   * The function returns null for null input if spark.sql.legacy.sizeOfNull is set to false or
+   * spark.sql.ansi.enabled is set to true. Otherwise, the function returns -1 for null input.
+   * With the default settings, the function returns -1 for null input.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def cardinality(e: Column): Column = size(e)
+
   /**
    * Sorts the input array for the given column in ascending order,
    * according to the natural ordering of the array elements.
@@ -6723,6 +6885,24 @@ object functions {
    */
   def array_max(e: Column): Column = withExpr { ArrayMax(e.expr) }
 
+  /**
+   * Returns the total number of elements in the array. The function returns null for null input.
+   *
+   * @group collection_funcs
+   * @since 3.5.0
+   */
+  def array_size(e: Column): Column = withExpr { ArraySize(e.expr) }
+
+  /**
+   * Aggregate function: returns a list of objects with duplicates.
+   *
+   * @note The function is non-deterministic because the order of collected results depends
+   *       on the order of the rows which may be non-deterministic after a shuffle.
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def array_agg(e: Column): Column = collect_list(e)
+
   /**
    * Returns a random permutation of the given array.
    *
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
index 5a80bdcdc0f..17f16822aa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CountMinSketchAggQuerySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.functions.{count_min_sketch, lit}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.util.sketch.CountMinSketch
 
@@ -42,4 +43,22 @@ class CountMinSketchAggQuerySuite extends QueryTest with SharedSparkSession {
 
     assert(sketch == reference)
   }
+
+  test("function count_min_sketch") {
+    import testImplicits._
+
+    val eps = 0.1
+    val confidence = 0.95
+    val seed = 11
+
+    val items = Seq(1, 1, 2, 2, 2, 2, 3, 4, 5)
+    val sketch = CountMinSketch.readFrom(items.toDF("id")
+      .select(count_min_sketch($"id", lit(eps), lit(confidence), lit(seed)))
+      .head().get(0).asInstanceOf[Array[Byte]])
+
+    val reference = CountMinSketch.create(eps, confidence, seed)
+    items.foreach(reference.add)
+
+    assert(sketch == reference)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 87fa57a7192..577d86c2d9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -575,6 +575,18 @@ class DataFrameAggregateSuite extends QueryTest
       Seq(Set(1, 2, 3) -> Set(2, 4)): _*)
   }
 
+  test("array_agg function") {
+    val df = Seq((1, 2), (2, 2), (3, 4)).toDF("a", "b")
+    checkAnswer(
+      df.selectExpr("array_agg(a)", "array_agg(b)"),
+      Seq(Row(Seq(1, 2, 3), Seq(2, 2, 4)))
+    )
+    checkAnswer(
+      df.select(array_agg($"a"), array_agg($"b")),
+      Seq(Row(Seq(1, 2, 3), Seq(2, 2, 4)))
+    )
+  }
+
   test("collect functions structs") {
     val df = Seq((1, 2, 2), (2, 2, 2), (3, 4, 1))
       .toDF("a", "x", "y")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 419e00659ea..e7277451e04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -75,7 +75,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
       "udaf", "udf" // create function statement in sql
     )
 
-    val excludedSqlFunctions = Set("array_agg", "cardinality")
+    val excludedSqlFunctions = Set.empty[String]
 
     val expectedOnlyDataFrameFunctions = Set(
       "bucket", "days", "hours", "months", "years", // Datasource v2 partition transformations
@@ -883,6 +883,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
 
     checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
     checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+    checkAnswer(df.select(cardinality($"a")), Seq(Row(2L), Row(0L), Row(3L), Row(sizeOfNull)))
     checkAnswer(df.selectExpr("cardinality(a)"), Seq(Row(2L), Row(0L), Row(3L), Row(sizeOfNull)))
   }
 
@@ -1103,6 +1104,8 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
 
     checkAnswer(df.select(size($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
     checkAnswer(df.selectExpr("size(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+    checkAnswer(df.select(cardinality($"a")), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
+    checkAnswer(df.selectExpr("cardinality(a)"), Seq(Row(2), Row(0), Row(3), Row(sizeOfNull)))
   }
 
   test("map size function - legacy") {
@@ -1742,6 +1745,34 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     checkAnswer(df.selectExpr("array_max(a)"), answer)
   }
 
+  test("array_size function") {
+    val df = Seq(
+      Seq[Option[Int]](Some(1), Some(3), Some(2)),
+      Seq.empty[Option[Int]],
+      null,
+      Seq[Option[Int]](None, Some(1))
+    ).toDF("a")
+
+    val answer = Seq(Row(3), Row(0), Row(null), Row(2))
+
+    checkAnswer(df.select(array_size(df("a"))), answer)
+    checkAnswer(df.selectExpr("array_size(a)"), answer)
+  }
+
+  test("cardinality function") {
+    val df = Seq(
+      Seq[Option[Int]](Some(1), Some(3), Some(2)),
+      Seq.empty[Option[Int]],
+      null,
+      Seq[Option[Int]](None, Some(1))
+    ).toDF("a")
+
+    val answer = Seq(Row(3), Row(0), Row(null), Row(2))
+
+    checkAnswer(df.select(array_size(df("a"))), answer)
+    checkAnswer(df.selectExpr("array_size(a)"), answer)
+  }
+
   test("sequence") {
     checkAnswer(Seq((-2, 2)).toDF().select(sequence($"_1", $"_2")),
       Seq(Row(Array(-2, -1, 0, 1, 2))))
@@ -5638,6 +5669,40 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     )
   }
 
+  test("mask function") {
+    val df = Seq("AbCD123-@$#", "abcd-EFGH-8765-4321").toDF("a")
+
+    checkAnswer(df.selectExpr("mask(a)"),
+      Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+    checkAnswer(df.select(mask($"a")),
+      Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+
+    checkAnswer(df.selectExpr("mask(a, 'Y')"),
+      Seq(Row("YxYYnnn-@$#"), Row("xxxx-YYYY-nnnn-nnnn")))
+    checkAnswer(df.select(mask($"a", lit('Y'))),
+      Seq(Row("YxYYnnn-@$#"), Row("xxxx-YYYY-nnnn-nnnn")))
+
+    checkAnswer(df.selectExpr("mask(a, 'Y', 'y')"),
+      Seq(Row("YyYYnnn-@$#"), Row("yyyy-YYYY-nnnn-nnnn")))
+    checkAnswer(df.select(mask($"a", lit('Y'), lit('y'))),
+      Seq(Row("YyYYnnn-@$#"), Row("yyyy-YYYY-nnnn-nnnn")))
+
+    checkAnswer(df.selectExpr("mask(a, 'Y', 'y', 'd')"),
+      Seq(Row("YyYYddd-@$#"), Row("yyyy-YYYY-dddd-dddd")))
+    checkAnswer(df.select(mask($"a", lit('Y'), lit('y'), lit('d'))),
+      Seq(Row("YyYYddd-@$#"), Row("yyyy-YYYY-dddd-dddd")))
+
+    checkAnswer(df.selectExpr("mask(a, 'X', 'x', 'n', null)"),
+      Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+    checkAnswer(df.select(mask($"a", lit('X'), lit('x'), lit('n'), lit(null))),
+      Seq(Row("XxXXnnn-@$#"), Row("xxxx-XXXX-nnnn-nnnn")))
+
+    checkAnswer(df.selectExpr("mask(a, null, null, null, '*')"),
+      Seq(Row("AbCD123****"), Row("abcd*EFGH*8765*4321")))
+    checkAnswer(df.select(mask($"a", lit(null), lit(null), lit(null), lit('*'))),
+      Seq(Row("AbCD123****"), Row("abcd*EFGH*8765*4321")))
+  }
+
   test("test array_compact") {
     val df = Seq(
       (Array[Integer](null, 1, 2, null, 3, 4),
@@ -5818,6 +5883,24 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     checkAnswer(df.selectExpr("CURRENT_USER()"), df.select(current_user()))
     checkAnswer(df.selectExpr("USER()"), df.select(user()))
   }
+
+  test("named_struct function") {
+    val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+    val expectedSchema = StructType(
+      StructField(
+        "value",
+        StructType(StructField("x", IntegerType, false) ::
+          StructField("y", IntegerType, false) :: Nil),
+        false) :: Nil)
+    val df1 = df.selectExpr("named_struct('x', a, 'y', b) value")
+    val df2 = df.select(named_struct(lit("x"), $"a", lit("y"), $"b")).toDF("value")
+
+    checkAnswer(df1, Seq(Row(Row(1, 2))))
+    assert(df1.schema === expectedSchema)
+
+    checkAnswer(df2, Seq(Row(Row(1, 2))))
+    assert(df2.schema === expectedSchema)
+  }
 }
 
 object DataFrameFunctionsSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index d2ffea07921..187fab75f63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -1370,4 +1370,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
       )
     )
   }
+
+  test("json_array_length function") {
+    val df = Seq(null, "[]", "[1, 2, 3]", "{\"key\": 1}", "invalid json")
+      .toDF("a")
+
+    val expected = Seq(Row(null), Row(0), Row(3), Row(null), Row(null))
+
+    checkAnswer(df.selectExpr("json_array_length(a)"), expected)
+    checkAnswer(df.select(json_array_length($"a")), expected)
+  }
+
+  test("json_object_keys function") {
+    val df = Seq(null, "{}", "{\"key1\":1, \"key2\": 2}", "[1, 2, 3]", "invalid json")
+      .toDF("a")
+
+    val expected = Seq(Row(null), Row(Seq.empty),
+      Row(Seq("key1", "key2")), Row(null), Row(null))
+
+    checkAnswer(df.selectExpr("json_object_keys(a)"), expected)
+    checkAnswer(df.select(json_object_keys($"a")), expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org