You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2023/06/26 16:07:17 UTC

[spark] branch master updated: [SPARK-43924][CONNECT][PYTHON] Add misc functions to Scala and Python

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a0ae19ee50 [SPARK-43924][CONNECT][PYTHON] Add misc functions to Scala and Python
6a0ae19ee50 is described below

commit 6a0ae19ee509f3246767dda3b43ca76b2254d564
Author: panbingkun <pb...@gmail.com>
AuthorDate: Mon Jun 26 09:06:42 2023 -0700

    [SPARK-43924][CONNECT][PYTHON] Add misc functions to Scala and Python
    
    ### What changes were proposed in this pull request?
    Add following functions:
    
    - uuid
    - aes_encrypt
    - aes_decrypt
    - sha
    - input_file_block_length
    - input_file_block_start
    - reflect
    - java_method
    - version
    - typeof
    - stack
    - random
    
    to:
    
    - Scala API
    - Python API
    - Spark Connect Scala Client
    - Spark Connect Python Client
    
    ### Why are the changes needed?
    for parity
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, new functions.
    
    ### How was this patch tested?
    - Add New UT.
    
    Closes #41689 from panbingkun/SPARK-43924.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     | 253 +++++++++++++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  79 ++++
 .../explain-results/function_aes_decrypt.explain   |   2 +
 .../function_aes_decrypt_with_mode.explain         |   2 +
 .../function_aes_decrypt_with_mode_padding.explain |   2 +
 ...ction_aes_decrypt_with_mode_padding_aad.explain |   2 +
 .../explain-results/function_aes_encrypt.explain   |   2 +
 .../function_aes_encrypt_with_mode.explain         |   2 +
 .../function_aes_encrypt_with_mode_padding.explain |   2 +
 ...nction_aes_encrypt_with_mode_padding_iv.explain |   2 +
 ...on_aes_encrypt_with_mode_padding_iv_aad.explain |   2 +
 .../function_input_file_block_length.explain       |   2 +
 .../function_input_file_block_start.explain        |   2 +
 .../explain-results/function_java_method.explain   |   2 +
 .../function_random_with_seed.explain              |   2 +
 .../explain-results/function_reflect.explain       |   2 +
 .../explain-results/function_sha.explain           |   2 +
 .../explain-results/function_stack.explain         |   3 +
 .../explain-results/function_typeof.explain        |   2 +
 .../query-tests/queries/function_aes_decrypt.json  |  29 ++
 .../queries/function_aes_decrypt.proto.bin         | Bin 0 -> 187 bytes
 .../queries/function_aes_decrypt_with_mode.json    |  33 ++
 .../function_aes_decrypt_with_mode.proto.bin       | Bin 0 -> 194 bytes
 .../function_aes_decrypt_with_mode_padding.json    |  37 ++
 ...unction_aes_decrypt_with_mode_padding.proto.bin | Bin 0 -> 201 bytes
 ...function_aes_decrypt_with_mode_padding_aad.json |  41 +++
 ...ion_aes_decrypt_with_mode_padding_aad.proto.bin | Bin 0 -> 208 bytes
 .../query-tests/queries/function_aes_encrypt.json  |  29 ++
 .../queries/function_aes_encrypt.proto.bin         | Bin 0 -> 187 bytes
 .../queries/function_aes_encrypt_with_mode.json    |  33 ++
 .../function_aes_encrypt_with_mode.proto.bin       | Bin 0 -> 194 bytes
 .../function_aes_encrypt_with_mode_padding.json    |  37 ++
 ...unction_aes_encrypt_with_mode_padding.proto.bin | Bin 0 -> 201 bytes
 .../function_aes_encrypt_with_mode_padding_iv.json |  41 +++
 ...tion_aes_encrypt_with_mode_padding_iv.proto.bin | Bin 0 -> 210 bytes
 ...ction_aes_encrypt_with_mode_padding_iv_aad.json |  45 +++
 ..._aes_encrypt_with_mode_padding_iv_aad.proto.bin | Bin 0 -> 217 bytes
 .../queries/function_input_file_block_length.json  |  20 ++
 .../function_input_file_block_length.proto.bin     | Bin 0 -> 185 bytes
 .../queries/function_input_file_block_start.json   |  20 ++
 .../function_input_file_block_start.proto.bin      | Bin 0 -> 184 bytes
 .../query-tests/queries/function_java_method.json  |  33 ++
 .../queries/function_java_method.proto.bin         | Bin 0 -> 216 bytes
 .../queries/function_random_with_seed.json         |  25 ++
 .../queries/function_random_with_seed.proto.bin    | Bin 0 -> 174 bytes
 .../query-tests/queries/function_reflect.json      |  33 ++
 .../query-tests/queries/function_reflect.proto.bin | Bin 0 -> 212 bytes
 .../query-tests/queries/function_sha.json          |  25 ++
 .../query-tests/queries/function_sha.proto.bin     | Bin 0 -> 172 bytes
 .../query-tests/queries/function_stack.json        |  37 ++
 .../query-tests/queries/function_stack.proto.bin   | Bin 0 -> 194 bytes
 .../query-tests/queries/function_typeof.json       |  25 ++
 .../query-tests/queries/function_typeof.proto.bin  | Bin 0 -> 175 bytes
 .../source/reference/pyspark.sql/functions.rst     |  12 +
 python/pyspark/sql/connect/functions.py            | 111 ++++++
 python/pyspark/sql/functions.py                    | 397 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     | 292 +++++++++++++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |   6 +-
 .../org/apache/spark/sql/MiscFunctionsSuite.scala  | 152 +++++++-
 59 files changed, 1874 insertions(+), 6 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index d258abcecfa..ed0c13b2145 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3282,6 +3282,259 @@ object functions {
    */
   def user(): Column = Column.fn("user")
 
+  /**
+   * Returns an universally unique identifier (UUID) string. The value is returned as a canonical
+   * UUID 36-character string.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def uuid(): Column = Column.fn("uuid")
+
+  /**
+   * Returns an encrypted value of `input` using AES in given `mode` with the specified `padding`.
+   * Key lengths of 16, 24 and 32 bits are supported. Supported combinations of (`mode`,
+   * `padding`) are ('ECB', 'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional initialization
+   * vectors (IVs) are only supported for CBC and GCM modes. These must be 16 bytes for CBC and 12
+   * bytes for GCM. If not provided, a random vector will be generated and prepended to the
+   * output. Optional additional authenticated data (AAD) is only supported for GCM. If provided
+   * for encryption, the identical AAD value must be provided for decryption. The default mode is
+   * GCM.
+   *
+   * @param input
+   *   The binary value to encrypt.
+   * @param key
+   *   The passphrase to use to encrypt the data.
+   * @param mode
+   *   Specifies which block cipher mode should be used to encrypt messages. Valid modes: ECB,
+   *   GCM, CBC.
+   * @param padding
+   *   Specifies how to pad messages whose length is not a multiple of the block size. Valid
+   *   values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+   *   for CBC.
+   * @param iv
+   *   Optional initialization vector. Only supported for CBC and GCM modes. Valid values: None or
+   *   "". 16-byte array for CBC mode. 12-byte array for GCM mode.
+   * @param aad
+   *   Optional additional authenticated data. Only supported for GCM mode. This can be any
+   *   free-form input and must be provided for both encryption and decryption.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column,
+      iv: Column,
+      aad: Column): Column = Column.fn("aes_encrypt", input, key, mode, padding, iv, aad)
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column, mode: Column, padding: Column, iv: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode, padding, iv)
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column, mode: Column, padding: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode, padding)
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column, mode: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode)
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column): Column =
+    Column.fn("aes_encrypt", input, key)
+
+  /**
+   * Returns a decrypted value of `input` using AES in `mode` with `padding`. Key lengths of 16,
+   * 24 and 32 bits are supported. Supported combinations of (`mode`, `padding`) are ('ECB',
+   * 'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional additional authenticated data (AAD) is
+   * only supported for GCM. If provided for encryption, the identical AAD value must be provided
+   * for decryption. The default mode is GCM.
+   *
+   * @param input
+   *   The binary value to decrypt.
+   * @param key
+   *   The passphrase to use to decrypt the data.
+   * @param mode
+   *   Specifies which block cipher mode should be used to decrypt messages. Valid modes: ECB,
+   *   GCM, CBC.
+   * @param padding
+   *   Specifies how to pad messages whose length is not a multiple of the block size. Valid
+   *   values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+   *   for CBC.
+   * @param aad
+   *   Optional additional authenticated data. Only supported for GCM mode. This can be any
+   *   free-form input and must be provided for both encryption and decryption.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column,
+      aad: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode, padding, aad)
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(input: Column, key: Column, mode: Column, padding: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode, padding)
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(input: Column, key: Column, mode: Column): Column =
+    Column.fn("aes_encrypt", input, key, mode)
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(input: Column, key: Column): Column =
+    Column.fn("aes_encrypt", input, key)
+
+  /**
+   * Returns a sha1 hash value as a hex string of the `col`.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def sha(col: Column): Column = Column.fn("sha", col)
+
+  /**
+   * Returns the length of the block being read, or -1 if not available.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def input_file_block_length(): Column = Column.fn("input_file_block_length")
+
+  /**
+   * Returns the start offset of the block being read, or -1 if not available.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def input_file_block_start(): Column = Column.fn("input_file_block_start")
+
+  /**
+   * Calls a method with reflection.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def reflect(cols: Column*): Column = Column.fn("reflect", cols: _*)
+
+  /**
+   * Calls a method with reflection.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def java_method(cols: Column*): Column = Column.fn("java_method", cols: _*)
+
+  /**
+   * Returns the Spark version. The string contains 2 fields, the first being a release version
+   * and the second being a git revision.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def version(): Column = Column.fn("version")
+
+  /**
+   * Return DDL-formatted type string for the data type of the input.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def typeof(col: Column): Column = Column.fn("typeof", col)
+
+  /**
+   * Separates `col1`, ..., `colk` into `n` rows. Uses column names col0, col1, etc. by default
+   * unless specified otherwise.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def stack(cols: Column*): Column = Column.fn("stack", cols: _*)
+
+  /**
+   * Returns a random value with independent and identically distributed (i.i.d.) uniformly
+   * distributed values in [0, 1).
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def random(seed: Column): Column = Column.fn("random", seed)
+
+  /**
+   * Returns a random value with independent and identically distributed (i.i.d.) uniformly
+   * distributed values in [0, 1).
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def random(): Column = Column.fn("random")
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // String functions
   //////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 53db026f340..e8d04f37d7f 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2670,6 +2670,85 @@ class PlanGenerationTestSuite
     fn.right(fn.col("g"), fn.col("g"))
   }
 
+  functionTest("aes_encrypt with mode padding iv aad") {
+    fn.aes_encrypt(
+      fn.col("g"),
+      fn.col("g"),
+      fn.col("g"),
+      fn.col("g"),
+      fn.lit(Array(67.toByte, 68.toByte, 69.toByte)),
+      fn.col("g"))
+  }
+
+  functionTest("aes_encrypt with mode padding iv") {
+    fn.aes_encrypt(
+      fn.col("g"),
+      fn.col("g"),
+      fn.col("g"),
+      fn.col("g"),
+      fn.lit(Array(67.toByte, 68.toByte, 69.toByte)))
+  }
+
+  functionTest("aes_encrypt with mode padding") {
+    fn.aes_encrypt(fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_encrypt with mode") {
+    fn.aes_encrypt(fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_encrypt") {
+    fn.aes_encrypt(fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_decrypt with mode padding aad") {
+    fn.aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_decrypt with mode padding") {
+    fn.aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_decrypt with mode") {
+    fn.aes_decrypt(fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("aes_decrypt") {
+    fn.aes_decrypt(fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("sha") {
+    fn.sha(fn.col("g"))
+  }
+
+  functionTest("input_file_block_length") {
+    fn.input_file_block_length()
+  }
+
+  functionTest("input_file_block_start") {
+    fn.input_file_block_start()
+  }
+
+  functionTest("reflect") {
+    fn.reflect(lit("java.util.UUID"), lit("fromString"), fn.col("g"))
+  }
+
+  functionTest("java_method") {
+    fn.java_method(lit("java.util.UUID"), lit("fromString"), fn.col("g"))
+  }
+
+  functionTest("typeof") {
+    fn.typeof(fn.col("g"))
+  }
+
+  functionTest("stack") {
+    fn.stack(lit(2), fn.col("g"), fn.col("g"), fn.col("g"))
+  }
+
+  functionTest("random with seed") {
+    fn.random(lit(1))
+  }
+
   test("groupby agg") {
     simple
       .groupBy(Column("id"))
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt.explain
new file mode 100644
index 00000000000..44084a8e60f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), GCM, DEFAULT, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, GCM, DEFAULT, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode.explain
new file mode 100644
index 00000000000..29ccf0c1c83
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, DEFAULT, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, DEFAULT, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding.explain
new file mode 100644
index 00000000000..5591363426a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, g, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding_aad.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding_aad.explain
new file mode 100644
index 00000000000..0e8d4df71b3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_decrypt_with_mode_padding_aad.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, cast(g#0 as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, g, g, )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt.explain
new file mode 100644
index 00000000000..44084a8e60f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), GCM, DEFAULT, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, GCM, DEFAULT, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode.explain
new file mode 100644
index 00000000000..29ccf0c1c83
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, DEFAULT, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, DEFAULT, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding.explain
new file mode 100644
index 00000000000..5591363426a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, cast( as binary), cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, g, , )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv.explain
new file mode 100644
index 00000000000..54b08d7bdb4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, 0x434445, cast( as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, g, X'434445', )#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv_aad.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv_aad.explain
new file mode 100644
index 00000000000..024089170bc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aes_encrypt_with_mode_padding_iv_aad.explain
@@ -0,0 +1,2 @@
+Project [staticinvoke(class org.apache.spark.sql.catalyst.expressions.ExpressionImplUtils, BinaryType, aesEncrypt, cast(g#0 as binary), cast(g#0 as binary), g#0, g#0, 0x434445, cast(g#0 as binary), BinaryType, BinaryType, StringType, StringType, BinaryType, BinaryType, true, true, true) AS aes_encrypt(g, g, g, g, X'434445', g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_length.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_length.explain
new file mode 100644
index 00000000000..24df63ae47d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_length.explain
@@ -0,0 +1,2 @@
+Project [input_file_block_length() AS input_file_block_length()#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_start.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_start.explain
new file mode 100644
index 00000000000..749be1b852e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_input_file_block_start.explain
@@ -0,0 +1,2 @@
+Project [input_file_block_start() AS input_file_block_start()#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_java_method.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_java_method.explain
new file mode 100644
index 00000000000..0d467be225f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_java_method.explain
@@ -0,0 +1,2 @@
+Project [java_method(java.util.UUID, fromString, g#0) AS java_method(java.util.UUID, fromString, g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_random_with_seed.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_random_with_seed.explain
new file mode 100644
index 00000000000..81c81e95c2b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_random_with_seed.explain
@@ -0,0 +1,2 @@
+Project [random(1) AS rand(1)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_reflect.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reflect.explain
new file mode 100644
index 00000000000..f52d3e1b0ff
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reflect.explain
@@ -0,0 +1,2 @@
+Project [reflect(java.util.UUID, fromString, g#0) AS reflect(java.util.UUID, fromString, g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_sha.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sha.explain
new file mode 100644
index 00000000000..22944b4ac08
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sha.explain
@@ -0,0 +1,2 @@
+Project [sha(cast(g#0 as binary)) AS sha(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_stack.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_stack.explain
new file mode 100644
index 00000000000..702e991424c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_stack.explain
@@ -0,0 +1,3 @@
+Project [col0#0, col1#0]
++- Generate stack(2, g#0, g#0, g#0), false, [col0#0, col1#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_typeof.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_typeof.explain
new file mode 100644
index 00000000000..bd55e13192f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_typeof.explain
@@ -0,0 +1,2 @@
+Project [typeof(g#0) AS typeof(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.json
new file mode 100644
index 00000000000..06469d48405
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.proto.bin
new file mode 100644
index 00000000000..c7a70b51707
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.json
new file mode 100644
index 00000000000..7eb9b4ed8b4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.proto.bin
new file mode 100644
index 00000000000..ecd81ae44fc
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.json
new file mode 100644
index 00000000000..59a6a5e35fd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.proto.bin
new file mode 100644
index 00000000000..9de01ddc5ea
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.json
new file mode 100644
index 00000000000..a87ec1b7f4d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.json
@@ -0,0 +1,41 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.proto.bin
new file mode 100644
index 00000000000..13da507fe6f
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_decrypt_with_mode_padding_aad.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.json
new file mode 100644
index 00000000000..06469d48405
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.json
@@ -0,0 +1,29 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.proto.bin
new file mode 100644
index 00000000000..c7a70b51707
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.json
new file mode 100644
index 00000000000..7eb9b4ed8b4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.proto.bin
new file mode 100644
index 00000000000..ecd81ae44fc
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.json
new file mode 100644
index 00000000000..59a6a5e35fd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.proto.bin
new file mode 100644
index 00000000000..9de01ddc5ea
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.json
new file mode 100644
index 00000000000..285c67289d3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.json
@@ -0,0 +1,41 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "binary": "Q0RF"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.proto.bin
new file mode 100644
index 00000000000..812426f3c00
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.json
new file mode 100644
index 00000000000..eb0e178fd35
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.json
@@ -0,0 +1,45 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aes_encrypt",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "binary": "Q0RF"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.proto.bin
new file mode 100644
index 00000000000..ee39beb07ce
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aes_encrypt_with_mode_padding_iv_aad.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.json b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.json
new file mode 100644
index 00000000000..2b478579f37
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.json
@@ -0,0 +1,20 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "input_file_block_length"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.proto.bin
new file mode 100644
index 00000000000..55684ba7d1b
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_length.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.json b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.json
new file mode 100644
index 00000000000..a85f58c3b9a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.json
@@ -0,0 +1,20 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "input_file_block_start"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.proto.bin
new file mode 100644
index 00000000000..6fa8027cc82
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_input_file_block_start.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.json b/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.json
new file mode 100644
index 00000000000..196dd486957
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "java_method",
+        "arguments": [{
+          "literal": {
+            "string": "java.util.UUID"
+          }
+        }, {
+          "literal": {
+            "string": "fromString"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.proto.bin
new file mode 100644
index 00000000000..b5cd2ea0e92
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_java_method.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.json b/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.json
new file mode 100644
index 00000000000..11238a43ec1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "random",
+        "arguments": [{
+          "literal": {
+            "integer": 1
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.proto.bin
new file mode 100644
index 00000000000..aa4208afedb
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_random_with_seed.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.json b/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.json
new file mode 100644
index 00000000000..2b0fe791115
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "reflect",
+        "arguments": [{
+          "literal": {
+            "string": "java.util.UUID"
+          }
+        }, {
+          "literal": {
+            "string": "fromString"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.proto.bin
new file mode 100644
index 00000000000..31c6c9bf131
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_reflect.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sha.json b/connector/connect/common/src/test/resources/query-tests/queries/function_sha.json
new file mode 100644
index 00000000000..57c5cb5bbd2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_sha.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "sha",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sha.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_sha.proto.bin
new file mode 100644
index 00000000000..e99760e4922
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_sha.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_stack.json b/connector/connect/common/src/test/resources/query-tests/queries/function_stack.json
new file mode 100644
index 00000000000..14865c72df2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_stack.json
@@ -0,0 +1,37 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "stack",
+        "arguments": [{
+          "literal": {
+            "integer": 2
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_stack.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_stack.proto.bin
new file mode 100644
index 00000000000..5e5e12478d6
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_stack.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.json b/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.json
new file mode 100644
index 00000000000..7a6fcfcbcf8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.json
@@ -0,0 +1,25 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "typeof",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.proto.bin
new file mode 100644
index 00000000000..a042a6e8d76
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_typeof.proto.bin differ
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index cf581f40e7a..8e8561920c5 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -467,11 +467,16 @@ Misc Functions
 .. autosummary::
     :toctree: api/
 
+    aes_decrypt
+    aes_encrypt
     current_catalog
     current_database
     current_schema
     current_user
+    input_file_block_length
+    input_file_block_start
     md5
+    sha
     sha1
     sha2
     crc32
@@ -479,9 +484,16 @@ Misc Functions
     xxhash64
     assert_true
     raise_error
+    reflect
     hll_sketch_estimate
     hll_union
+    java_method
+    random
+    stack
+    typeof
     user
+    uuid
+    version
 
 Predicate Functions
 -------------------
diff --git a/python/pyspark/sql/connect/functions.py b/python/pyspark/sql/connect/functions.py
index aaba75cc107..134465a7882 100644
--- a/python/pyspark/sql/connect/functions.py
+++ b/python/pyspark/sql/connect/functions.py
@@ -3603,6 +3603,117 @@ def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Co
 nvl2.__doc__ = pysparkfuncs.nvl2.__doc__
 
 
+def uuid() -> Column:
+    return _invoke_function_over_columns("uuid")
+
+
+uuid.__doc__ = pysparkfuncs.uuid.__doc__
+
+
+def aes_encrypt(
+    input: "ColumnOrName",
+    key: "ColumnOrName",
+    mode: Optional["ColumnOrName"] = None,
+    padding: Optional["ColumnOrName"] = None,
+    iv: Optional["ColumnOrName"] = None,
+    aad: Optional["ColumnOrName"] = None,
+) -> Column:
+    _mode = lit("GCM") if mode is None else _to_col(mode)
+    _padding = lit("DEFAULT") if padding is None else _to_col(padding)
+    _iv = lit("") if iv is None else _to_col(iv)
+    _aad = lit("") if aad is None else _to_col(aad)
+
+    return _invoke_function_over_columns("aes_encrypt", input, key, _mode, _padding, _iv, _aad)
+
+
+aes_encrypt.__doc__ = pysparkfuncs.aes_encrypt.__doc__
+
+
+def aes_decrypt(
+    input: "ColumnOrName",
+    key: "ColumnOrName",
+    mode: Optional["ColumnOrName"] = None,
+    padding: Optional["ColumnOrName"] = None,
+    aad: Optional["ColumnOrName"] = None,
+) -> Column:
+    _mode = lit("GCM") if mode is None else _to_col(mode)
+    _padding = lit("DEFAULT") if padding is None else _to_col(padding)
+    _aad = lit("") if aad is None else _to_col(aad)
+
+    return _invoke_function_over_columns("aes_decrypt", input, key, _mode, _padding, _aad)
+
+
+aes_decrypt.__doc__ = pysparkfuncs.aes_decrypt.__doc__
+
+
+def sha(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("sha", col)
+
+
+sha.__doc__ = pysparkfuncs.sha.__doc__
+
+
+def input_file_block_length() -> Column:
+    return _invoke_function_over_columns("input_file_block_length")
+
+
+input_file_block_length.__doc__ = pysparkfuncs.input_file_block_length.__doc__
+
+
+def input_file_block_start() -> Column:
+    return _invoke_function_over_columns("input_file_block_start")
+
+
+input_file_block_start.__doc__ = pysparkfuncs.input_file_block_start.__doc__
+
+
+def reflect(*cols: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("reflect", *cols)
+
+
+reflect.__doc__ = pysparkfuncs.reflect.__doc__
+
+
+def java_method(*cols: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("java_method", *cols)
+
+
+java_method.__doc__ = pysparkfuncs.java_method.__doc__
+
+
+def version() -> Column:
+    return _invoke_function_over_columns("version")
+
+
+version.__doc__ = pysparkfuncs.version.__doc__
+
+
+def typeof(col: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("typeof", col)
+
+
+typeof.__doc__ = pysparkfuncs.typeof.__doc__
+
+
+def stack(*cols: "ColumnOrName") -> Column:
+    return _invoke_function_over_columns("stack", *cols)
+
+
+stack.__doc__ = pysparkfuncs.stack.__doc__
+
+
+def random(
+    seed: Optional["ColumnOrName"] = None,
+) -> Column:
+    if seed is not None:
+        return _invoke_function_over_columns("random", seed)
+    else:
+        return _invoke_function_over_columns("random")
+
+
+random.__doc__ = pysparkfuncs.random.__doc__
+
+
 # User Defined Function
 
 
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 119a8b96bb4..c753b59bec5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -14394,6 +14394,403 @@ def nvl2(col1: "ColumnOrName", col2: "ColumnOrName", col3: "ColumnOrName") -> Co
     return _invoke_function_over_columns("nvl2", col1, col2, col3)
 
 
+@try_remote_functions
+def uuid() -> Column:
+    """
+    Returns an universally unique identifier (UUID) string. The value is returned as a canonical
+    UUID 36-character string.
+
+    .. versionadded:: 3.5.0
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(uuid()).show(truncate=False) # doctest: +SKIP
+    +------------------------------------+
+    |uuid()                              |
+    +------------------------------------+
+    |3dcc5174-9da9-41ca-815f-34c05c6d3926|
+    +------------------------------------+
+    """
+    return _invoke_function_over_columns("uuid")
+
+
+@try_remote_functions
+def aes_encrypt(
+    input: "ColumnOrName",
+    key: "ColumnOrName",
+    mode: Optional["ColumnOrName"] = None,
+    padding: Optional["ColumnOrName"] = None,
+    iv: Optional["ColumnOrName"] = None,
+    aad: Optional["ColumnOrName"] = None,
+) -> Column:
+    """
+    Returns an encrypted value of `input` using AES in given `mode` with the specified `padding`.
+    Key lengths of 16, 24 and 32 bits are supported. Supported combinations of (`mode`,
+    `padding`) are ('ECB', 'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional initialization
+    vectors (IVs) are only supported for CBC and GCM modes. These must be 16 bytes for CBC and 12
+    bytes for GCM. If not provided, a random vector will be generated and prepended to the
+    output. Optional additional authenticated data (AAD) is only supported for GCM. If provided
+    for encryption, the identical AAD value must be provided for decryption. The default mode is
+    GCM.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    input : :class:`~pyspark.sql.Column` or str
+        The binary value to encrypt.
+    key : :class:`~pyspark.sql.Column` or str
+        The passphrase to use to encrypt the data.
+    mode : :class:`~pyspark.sql.Column` or str, optional
+        Specifies which block cipher mode should be used to encrypt messages. Valid modes: ECB,
+        GCM, CBC.
+    padding : :class:`~pyspark.sql.Column` or str, optional
+        Specifies how to pad messages whose length is not a multiple of the block size. Valid
+        values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+        for CBC.
+    iv : :class:`~pyspark.sql.Column` or str, optional
+        Optional initialization vector. Only supported for CBC and GCM modes. Valid values: None or
+        "". 16-byte array for CBC mode. 12-byte array for GCM mode.
+    aad : :class:`~pyspark.sql.Column` or str, optional
+        Optional additional authenticated data. Only supported for GCM mode. This can be any
+        free-form input and must be provided for both encryption and decryption.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(
+    ...     "Spark", "abcdefghijklmnop12345678ABCDEFGH", "GCM", "DEFAULT",
+    ...     "000000000000000000000000", "This is an AAD mixed into the input",)],
+    ...     ["input", "key", "mode", "padding", "iv", "aad"]
+    ... )
+    >>> df.select(base64(aes_encrypt(
+    ...     df.input, df.key, df.mode, df.padding, to_binary(df.iv, lit("hex")), df.aad)
+    ... ).alias('r')).collect()
+    [Row(r='AAAAAAAAAAAAAAAAQiYi+sTLm7KD9UcZ2nlRdYDe/PX4')]
+
+    >>> df.select(base64(aes_encrypt(
+    ...     df.input, df.key, df.mode, df.padding, to_binary(df.iv, lit("hex")))
+    ... ).alias('r')).collect()
+    [Row(r='AAAAAAAAAAAAAAAAQiYi+sRNYDAOTjdSEcYBFsAWPL1f')]
+
+    >>> df = spark.createDataFrame([(
+    ...     "Spark SQL", "1234567890abcdef", "ECB", "PKCS",)],
+    ...     ["input", "key", "mode", "padding"]
+    ... )
+    >>> df.select(aes_decrypt(aes_encrypt(df.input, df.key, df.mode, df.padding),
+    ...     df.key, df.mode, df.padding).alias('r')
+    ... ).collect()
+    [Row(r=bytearray(b'Spark SQL'))]
+
+    >>> df = spark.createDataFrame([(
+    ...     "Spark SQL", "0000111122223333", "ECB",)],
+    ...     ["input", "key", "mode"]
+    ... )
+    >>> df.select(aes_decrypt(aes_encrypt(df.input, df.key, df.mode),
+    ...     df.key, df.mode).alias('r')
+    ... ).collect()
+    [Row(r=bytearray(b'Spark SQL'))]
+
+    >>> df = spark.createDataFrame([(
+    ...     "Spark SQL", "abcdefghijklmnop",)],
+    ...     ["input", "key"]
+    ... )
+    >>> df.select(aes_decrypt(
+    ...     unbase64(base64(aes_encrypt(df.input, df.key))), df.key
+    ... ).cast("STRING").alias('r')).collect()
+    [Row(r='Spark SQL')]
+    """
+    _mode = lit("GCM") if mode is None else mode
+    _padding = lit("DEFAULT") if padding is None else padding
+    _iv = lit("") if iv is None else iv
+    _aad = lit("") if aad is None else aad
+    return _invoke_function_over_columns("aes_encrypt", input, key, _mode, _padding, _iv, _aad)
+
+
+@try_remote_functions
+def aes_decrypt(
+    input: "ColumnOrName",
+    key: "ColumnOrName",
+    mode: Optional["ColumnOrName"] = None,
+    padding: Optional["ColumnOrName"] = None,
+    aad: Optional["ColumnOrName"] = None,
+) -> Column:
+    """
+    Returns a decrypted value of `input` using AES in `mode` with `padding`. Key lengths of 16,
+    24 and 32 bits are supported. Supported combinations of (`mode`, `padding`) are ('ECB',
+    'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional additional authenticated data (AAD) is
+    only supported for GCM. If provided for encryption, the identical AAD value must be provided
+    for decryption. The default mode is GCM.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    input : :class:`~pyspark.sql.Column` or str
+        The binary value to decrypt.
+    key : :class:`~pyspark.sql.Column` or str
+        The passphrase to use to decrypt the data.
+    mode : :class:`~pyspark.sql.Column` or str, optional
+        Specifies which block cipher mode should be used to decrypt messages. Valid modes: ECB,
+        GCM, CBC.
+    padding : :class:`~pyspark.sql.Column` or str, optional
+        Specifies how to pad messages whose length is not a multiple of the block size. Valid
+        values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+        for CBC.
+    aad : :class:`~pyspark.sql.Column` or str, optional
+        Optional additional authenticated data. Only supported for GCM mode. This can be any
+        free-form input and must be provided for both encryption and decryption.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(
+    ...     "AAAAAAAAAAAAAAAAQiYi+sTLm7KD9UcZ2nlRdYDe/PX4",
+    ...     "abcdefghijklmnop12345678ABCDEFGH", "GCM", "DEFAULT",
+    ...     "This is an AAD mixed into the input",)],
+    ...     ["input", "key", "mode", "padding", "aad"]
+    ... )
+    >>> df.select(aes_decrypt(
+    ...     unbase64(df.input), df.key, df.mode, df.padding, df.aad).alias('r')
+    ... ).collect()
+    [Row(r=bytearray(b'Spark'))]
+
+    >>> df = spark.createDataFrame([(
+    ...     "AAAAAAAAAAAAAAAAAAAAAPSd4mWyMZ5mhvjiAPQJnfg=",
+    ...     "abcdefghijklmnop12345678ABCDEFGH", "CBC", "DEFAULT",)],
+    ...     ["input", "key", "mode", "padding"]
+    ... )
+    >>> df.select(aes_decrypt(
+    ...     unbase64(df.input), df.key, df.mode, df.padding).alias('r')
+    ... ).collect()
+    [Row(r=bytearray(b'Spark'))]
+
+    >>> df.select(aes_decrypt(unbase64(df.input), df.key, df.mode).alias('r')).collect()
+    [Row(r=bytearray(b'Spark'))]
+
+    >>> df = spark.createDataFrame([(
+    ...     "83F16B2AA704794132802D248E6BFD4E380078182D1544813898AC97E709B28A94",
+    ...     "0000111122223333",)],
+    ...     ["input", "key"]
+    ... )
+    >>> df.select(aes_decrypt(unhex(df.input), df.key).alias('r')).collect()
+    [Row(r=bytearray(b'Spark'))]
+    """
+    _mode = lit("GCM") if mode is None else mode
+    _padding = lit("DEFAULT") if padding is None else padding
+    _aad = lit("") if aad is None else aad
+    return _invoke_function_over_columns("aes_decrypt", input, key, _mode, _padding, _aad)
+
+
+@try_remote_functions
+def sha(col: "ColumnOrName") -> Column:
+    """
+    Returns a sha1 hash value as a hex string of the `col`.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("Spark",)], ["a"])
+    >>> df.select(sha(df.a).alias('r')).collect()
+    [Row(r='85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c')]
+    """
+    return _invoke_function_over_columns("sha", col)
+
+
+@try_remote_functions
+def input_file_block_length() -> Column:
+    """
+    Returns the length of the block being read, or -1 if not available.
+
+    .. versionadded:: 3.5.0
+
+    Examples
+    --------
+    >>> df = spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",")
+    >>> df.select(input_file_block_length().alias('r')).first()
+    Row(r=87)
+    """
+    return _invoke_function_over_columns("input_file_block_length")
+
+
+@try_remote_functions
+def input_file_block_start() -> Column:
+    """
+    Returns the start offset of the block being read, or -1 if not available.
+
+    .. versionadded:: 3.5.0
+
+    Examples
+    --------
+    >>> df = spark.read.text("python/test_support/sql/ages_newlines.csv", lineSep=",")
+    >>> df.select(input_file_block_start().alias('r')).first()
+    Row(r=0)
+    """
+    return _invoke_function_over_columns("input_file_block_start")
+
+
+@try_remote_functions
+def reflect(*cols: "ColumnOrName") -> Column:
+    """
+    Calls a method with reflection.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        the first element should be a literal string for the class name,
+        and the second element should be a literal string for the method name,
+        and the remaining are input arguments to the Java method.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2",)], ["a"])
+    >>> df.select(
+    ...     reflect(lit("java.util.UUID"), lit("fromString"), df.a).alias('r')
+    ... ).collect()
+    [Row(r='a5cf6c42-0c85-418f-af6c-3e4e5b1328f2')]
+    """
+    return _invoke_function_over_seq_of_columns("reflect", cols)
+
+
+@try_remote_functions
+def java_method(*cols: "ColumnOrName") -> Column:
+    """
+    Calls a method with reflection.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        the first element should be a literal string for the class name,
+        and the second element should be a literal string for the method name,
+        and the remaining are input arguments to the Java method.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2",)], ["a"])
+    >>> df.select(
+    ...     java_method(lit("java.util.UUID"), lit("fromString"), df.a).alias('r')
+    ... ).collect()
+    [Row(r='a5cf6c42-0c85-418f-af6c-3e4e5b1328f2')]
+
+    """
+    return _invoke_function_over_seq_of_columns("java_method", cols)
+
+
+@try_remote_functions
+def version() -> Column:
+    """
+    Returns the Spark version. The string contains 2 fields, the first being a release version
+    and the second being a git revision.
+
+    .. versionadded:: 3.5.0
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(version()).show(truncate=False) # doctest: +SKIP
+    +----------------------------------------------+
+    |version()                                     |
+    +----------------------------------------------+
+    |3.5.0 cafbea5b13623276517a9d716f75745eff91f616|
+    +----------------------------------------------+
+    """
+    return _invoke_function_over_columns("version")
+
+
+@try_remote_functions
+def typeof(col: "ColumnOrName") -> Column:
+    """
+    Return DDL-formatted type string for the data type of the input.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1,)], ["a"])
+    >>> df.select(typeof(df.a).alias('r')).collect()
+    [Row(r='bigint')]
+    """
+    return _invoke_function_over_columns("typeof", col)
+
+
+@try_remote_functions
+def stack(*cols: "ColumnOrName") -> Column:
+    """
+    Separates `col1`, ..., `colk` into `n` rows. Uses column names col0, col1, etc. by default
+    unless specified otherwise.
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        the first element should be a literal int for the number of rows to be separated,
+        and the remaining are input elements to be separated.
+
+    Examples
+    --------
+    >>> df = spark.createDataFrame([(1, 2, 3)], ["a", "b", "c"])
+    >>> df.select(stack(lit(2), df.a, df.b, df.c)).show(truncate=False)
+    +----+----+
+    |col0|col1|
+    +----+----+
+    |1   |2   |
+    |3   |NULL|
+    +----+----+
+    """
+    return _invoke_function_over_seq_of_columns("stack", cols)
+
+
+@try_remote_functions
+def random(
+    seed: Optional["ColumnOrName"] = None,
+) -> Column:
+    """
+    Returns a random value with independent and identically distributed (i.i.d.) uniformly
+    distributed values in [0, 1).
+
+    .. versionadded:: 3.5.0
+
+    Parameters
+    ----------
+    cols : :class:`~pyspark.sql.Column` or str
+        The seed for the random generator.
+
+    Examples
+    --------
+    >>> df = spark.range(1)
+    >>> df.select(random()).show(truncate=False) # doctest: +SKIP
+    +--------------------+
+    |rand()              |
+    +--------------------+
+    |0.026810514415005593|
+    +--------------------+
+
+    >>> df.select(random(lit(1))).show(truncate=False) # doctest: +SKIP
+    +------------------+
+    |rand(1)           |
+    +------------------+
+    |0.4836508543933039|
+    +------------------+
+    """
+    if seed is not None:
+        return _invoke_function_over_columns("random", seed)
+    else:
+        return _invoke_function_over_columns("random")
+
+
 # ---------------------------- User Defined Function ----------------------------------
 
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 41a3781d2ed..b01bf1c6a3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3326,6 +3326,298 @@ object functions {
    */
   def user(): Column = withExpr { CurrentUser() }
 
+  /**
+   * Returns an universally unique identifier (UUID) string. The value is returned as a canonical
+   * UUID 36-character string.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def uuid(): Column = withExpr { new Uuid() }
+
+  /**
+   * Returns an encrypted value of `input` using AES in given `mode` with the specified `padding`.
+   * Key lengths of 16, 24 and 32 bits are supported. Supported combinations of (`mode`,
+   * `padding`) are ('ECB', 'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional initialization
+   * vectors (IVs) are only supported for CBC and GCM modes. These must be 16 bytes for CBC and 12
+   * bytes for GCM. If not provided, a random vector will be generated and prepended to the
+   * output. Optional additional authenticated data (AAD) is only supported for GCM. If provided
+   * for encryption, the identical AAD value must be provided for decryption. The default mode is
+   * GCM.
+   *
+   * @param input
+   *   The binary value to encrypt.
+   * @param key
+   *   The passphrase to use to encrypt the data.
+   * @param mode
+   *   Specifies which block cipher mode should be used to encrypt messages. Valid modes: ECB,
+   *   GCM, CBC.
+   * @param padding
+   *   Specifies how to pad messages whose length is not a multiple of the block size. Valid
+   *   values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+   *   for CBC.
+   * @param iv
+   *   Optional initialization vector. Only supported for CBC and GCM modes. Valid values: None or
+   *   "". 16-byte array for CBC mode. 12-byte array for GCM mode.
+   * @param aad
+   *   Optional additional authenticated data. Only supported for GCM mode. This can be any
+   *   free-form input and must be provided for both encryption and decryption.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column,
+      iv: Column,
+      aad: Column): Column = withExpr {
+    AesEncrypt(input.expr, key.expr, mode.expr, padding.expr, iv.expr, aad.expr)
+  }
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column,
+      iv: Column): Column = withExpr {
+    new AesEncrypt(input.expr, key.expr, mode.expr, padding.expr, iv.expr)
+  }
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column, mode: Column, padding: Column): Column = withExpr {
+    new AesEncrypt(input.expr, key.expr, mode.expr, padding.expr)
+  }
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column, mode: Column): Column = withExpr {
+    new AesEncrypt(input.expr, key.expr, mode.expr)
+  }
+
+  /**
+   * Returns an encrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_encrypt(Column, Column, Column, Column, Column,
+   *   Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_encrypt(input: Column, key: Column): Column = withExpr {
+    new AesEncrypt(input.expr, key.expr)
+  }
+
+  /**
+   * Returns a decrypted value of `input` using AES in `mode` with `padding`. Key lengths of 16,
+   * 24 and 32 bits are supported. Supported combinations of (`mode`, `padding`) are ('ECB',
+   * 'PKCS'), ('GCM', 'NONE') and ('CBC', 'PKCS'). Optional additional authenticated data (AAD) is
+   * only supported for GCM. If provided for encryption, the identical AAD value must be provided
+   * for decryption. The default mode is GCM.
+   *
+   * @param input
+   *   The binary value to decrypt.
+   * @param key
+   *   The passphrase to use to decrypt the data.
+   * @param mode
+   *   Specifies which block cipher mode should be used to decrypt messages. Valid modes: ECB,
+   *   GCM, CBC.
+   * @param padding
+   *   Specifies how to pad messages whose length is not a multiple of the block size. Valid
+   *   values: PKCS, NONE, DEFAULT. The DEFAULT padding means PKCS for ECB, NONE for GCM and PKCS
+   *   for CBC.
+   * @param aad
+   *   Optional additional authenticated data. Only supported for GCM mode. This can be any
+   *   free-form input and must be provided for both encryption and decryption.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column,
+      aad: Column): Column = withExpr {
+    AesDecrypt(input.expr, key.expr, mode.expr, padding.expr, aad.expr)
+  }
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(
+      input: Column,
+      key: Column,
+      mode: Column,
+      padding: Column): Column = withExpr {
+    new AesDecrypt(input.expr, key.expr, mode.expr, padding.expr)
+  }
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(input: Column, key: Column, mode: Column): Column = withExpr {
+    new AesDecrypt(input.expr, key.expr, mode.expr)
+  }
+
+  /**
+   * Returns a decrypted value of `input`.
+   *
+   * @see
+   *   `org.apache.spark.sql.functions.aes_decrypt(Column, Column, Column, Column, Column)`
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def aes_decrypt(input: Column, key: Column): Column = withExpr {
+    new AesDecrypt(input.expr, key.expr)
+  }
+
+  /**
+   * Returns a sha1 hash value as a hex string of the `col`.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def sha(col: Column): Column = withExpr {
+    Sha1(col.expr)
+  }
+
+  /**
+   * Returns the length of the block being read, or -1 if not available.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def input_file_block_length(): Column = withExpr {
+    InputFileBlockLength()
+  }
+
+  /**
+   * Returns the start offset of the block being read, or -1 if not available.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def input_file_block_start(): Column = withExpr {
+    InputFileBlockStart()
+  }
+
+  /**
+   * Calls a method with reflection.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def reflect(cols: Column*): Column = withExpr {
+    CallMethodViaReflection(cols.map(_.expr))
+  }
+
+  /**
+   * Calls a method with reflection.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def java_method(cols: Column*): Column = withExpr {
+    CallMethodViaReflection(cols.map(_.expr))
+  }
+
+  /**
+   * Returns the Spark version. The string contains 2 fields, the first being a release version
+   * and the second being a git revision.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def version(): Column = withExpr {
+    SparkVersion()
+  }
+
+  /**
+   * Return DDL-formatted type string for the data type of the input.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def typeof(col: Column): Column = withExpr {
+    TypeOf(col.expr)
+  }
+
+  /**
+   * Separates `col1`, ..., `colk` into `n` rows. Uses column names col0, col1, etc. by default
+   * unless specified otherwise.
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def stack(cols: Column*): Column = withExpr {
+    Stack(cols.map(_.expr))
+  }
+
+  /**
+   * Returns a random value with independent and identically distributed (i.i.d.) uniformly
+   * distributed values in [0, 1).
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def random(seed: Column): Column = withExpr {
+    Rand(seed.expr)
+  }
+
+  /**
+   * Returns a random value with independent and identically distributed (i.i.d.) uniformly
+   * distributed values in [0, 1).
+   *
+   * @group misc_funcs
+   * @since 3.5.0
+   */
+  def random(): Column = withExpr {
+    new Rand()
+  }
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // String functions
   //////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index e892ca0b567..f1f6480cc08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -75,11 +75,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
       "udaf", "udf" // create function statement in sql
     )
 
-    val excludedSqlFunctions = Set(
-      "random", "array_agg", "cardinality", "sha",
-      // aliases for existing functions
-      "reflect", "java_method" // Only needed in SQL
-    )
+    val excludedSqlFunctions = Set("array_agg", "cardinality")
 
     val expectedOnlyDataFrameFunctions = Set(
       "bucket", "days", "hours", "months", "years", // Datasource v2 partition transformations
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
index d498982fb2d..b3a51896385 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MiscFunctionsSuite.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql
 
 import org.apache.spark.{SPARK_REVISION, SPARK_VERSION_SHORT}
+import org.apache.spark.sql.catalyst.expressions.Hex
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.BinaryType
+import org.apache.spark.sql.types.{BinaryType, StringType}
 
 class MiscFunctionsSuite extends QueryTest with SharedSparkSession {
   import testImplicits._
@@ -42,6 +44,8 @@ class MiscFunctionsSuite extends QueryTest with SharedSparkSession {
       df,
       Row(SPARK_VERSION_SHORT + " " + SPARK_REVISION))
     assert(df.schema.fieldNames === Seq("version()"))
+
+    checkAnswer(df.selectExpr("version()"), df.select(version()))
   }
 
   test("SPARK-21957: get current_user in normal spark apps") {
@@ -84,6 +88,152 @@ class MiscFunctionsSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("uuid") {
+    val df = Seq((1, 2)).toDF("a", "b")
+    assert(df.selectExpr("uuid()").collect() != null)
+    assert(df.select(uuid()).collect() != null)
+  }
+
+  test("aes_encrypt") {
+    val iv = Hex.unhex("000000000000000000000000".getBytes())
+    val df = Seq(("Spark", "abcdefghijklmnop12345678ABCDEFGH",
+      "GCM", "DEFAULT", iv, "This is an AAD mixed into the input")).
+      toDF("input", "key", "mode", "padding", "iv", "aad")
+
+    checkAnswer(
+      df.selectExpr("aes_encrypt(input, key, mode, padding, iv, aad)"),
+      df.select(aes_encrypt(col("input"), col("key"), col("mode"),
+        col("padding"), col("iv"), col("aad"))))
+
+    checkAnswer(
+      df.selectExpr("aes_encrypt(input, key, mode, padding, iv)"),
+      df.select(aes_encrypt(col("input"), col("key"), col("mode"),
+        col("padding"), col("iv"))))
+
+    val df1 = Seq(("Spark SQL", "1234567890abcdef", "ECB", "PKCS")).
+      toDF("input", "key", "mode", "padding")
+
+    checkAnswer(
+      df1.selectExpr("base64(aes_encrypt(input, key, mode, padding))"),
+      df1.select(base64(aes_encrypt(col("input"), col("key"), col("mode"), col("padding")))))
+
+    val df2 = Seq(("Spark SQL", "0000111122223333", "ECB")).toDF("input", "key", "mode")
+
+    checkAnswer(
+      df2.selectExpr("hex(aes_encrypt(input, key, mode))"),
+      df2.select(hex(aes_encrypt(col("input"), col("key"), col("mode")))))
+
+    val df3 = Seq(("Spark", "abcdefghijklmnop")).toDF("input", "key")
+    checkAnswer(
+      df3.selectExpr("cast(aes_decrypt(unbase64(base64(" +
+        "aes_encrypt(input, key))), key) AS STRING)"),
+      Seq(Row("Spark")))
+    checkAnswer(
+      df3.select(aes_decrypt(unbase64(base64(
+        aes_encrypt(col("input"), col("key")))), col("key")).cast(StringType)),
+      Seq(Row("Spark")))
+  }
+
+  test("aes_decrypt") {
+    val df = Seq(("AAAAAAAAAAAAAAAAQiYi+sTLm7KD9UcZ2nlRdYDe/PX4",
+      "abcdefghijklmnop12345678ABCDEFGH", "GCM", "DEFAULT", "This is an AAD mixed into the input"
+    )).toDF("input", "key", "mode", "padding", "aad")
+
+    checkAnswer(
+      df.selectExpr("aes_decrypt(unbase64(input), key, mode, padding, aad)"),
+      df.select(aes_decrypt(unbase64(col("input")), col("key"),
+        col("mode"), col("padding"), col("aad"))))
+
+    val df1 = Seq(("AAAAAAAAAAAAAAAAAAAAAPSd4mWyMZ5mhvjiAPQJnfg=",
+      "abcdefghijklmnop12345678ABCDEFGH", "CBC", "DEFAULT"
+    )).toDF("input", "key", "mode", "padding")
+
+    checkAnswer(
+      df1.selectExpr("aes_decrypt(unbase64(input), key, mode, padding)"),
+      df1.select(aes_decrypt(unbase64(col("input")), col("key"),
+        col("mode"), col("padding"))))
+
+     checkAnswer(
+      df1.selectExpr("aes_decrypt(unbase64(input), key, mode)"),
+      df1.select(aes_decrypt(unbase64(col("input")), col("key"), col("mode"))))
+
+    val df2 = Seq(("83F16B2AA704794132802D248E6BFD4E380078182D1544813898AC97E709B28A94",
+      "0000111122223333")).toDF("input", "key")
+     checkAnswer(
+      df2.selectExpr("aes_decrypt(unhex(input), key)"),
+      df2.select(aes_decrypt(unhex(col("input")), col("key"))))
+  }
+
+  test("sha") {
+    val df = Seq("Spark").toDF("a")
+    checkAnswer(df.selectExpr("sha(a)"), df.select(sha(col("a"))))
+  }
+
+  test("input_file_block_length") {
+    val tableName = "t1"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName(a String) USING parquet")
+      sql(s"insert into $tableName values('a')")
+      val df = spark.table(tableName)
+      checkAnswer(
+        df.selectExpr("input_file_block_length()"),
+        df.select(input_file_block_length())
+      )
+    }
+  }
+
+  test("input_file_block_start") {
+    val tableName = "t1"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName(a String) USING parquet")
+      sql(s"insert into $tableName values('a')")
+      val df = spark.table(tableName)
+      checkAnswer(
+        df.selectExpr("input_file_block_start()"),
+        df.select(input_file_block_start())
+      )
+    }
+  }
+
+  test("reflect") {
+    val df = Seq("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2").toDF("a")
+    checkAnswer(df.selectExpr("reflect('java.util.UUID', 'fromString', a)"),
+      Seq(Row("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2")))
+    checkAnswer(df.select(reflect(lit("java.util.UUID"), lit("fromString"), col("a"))),
+      Seq(Row("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2")))
+  }
+
+  test("java_method") {
+    val df = Seq("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2").toDF("a")
+    checkAnswer(df.selectExpr("java_method('java.util.UUID', 'fromString', a)"),
+      Seq(Row("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2")))
+    checkAnswer(df.select(java_method(lit("java.util.UUID"), lit("fromString"), col("a"))),
+      Seq(Row("a5cf6c42-0c85-418f-af6c-3e4e5b1328f2")))
+  }
+
+  test("typeof") {
+    val df = Seq(1).toDF("a")
+    checkAnswer(df.selectExpr("typeof(a)"), Seq(Row("int")))
+    checkAnswer(df.select(typeof(col("a"))), Seq(Row("int")))
+  }
+
+  test("stack") {
+    val df = Seq((1, 2, 3)).toDF("a", "b", "c")
+    checkAnswer(df.selectExpr("stack(2, a, b, c)"),
+      Seq(Row(1, 2), Row(3, null)))
+    checkAnswer(df.select(stack(lit(2), col("a"), col("b"), col("c"))),
+      Seq(Row(1, 2), Row(3, null)))
+  }
+
+  test("random") {
+     val df = Seq((1, 2)).toDF("a", "b")
+    assert(df.selectExpr("random()").collect() != null)
+    assert(df.select(random()).collect() != null)
+
+    assert(df.selectExpr("random(1)").collect() != null)
+    assert(df.select(random(lit(1))).collect() != null)
+  }
 }
 
 object ReflectClass {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org