You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/23 15:46:25 UTC

[spark] branch branch-3.4 updated: [SPARK-42531][CONNECT] Scala Client Add Collections Functions

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 0e6df2e9ad6 [SPARK-42531][CONNECT] Scala Client Add Collections Functions
0e6df2e9ad6 is described below

commit 0e6df2e9ad665720c746533bc74b82fbda675a86
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Thu Feb 23 11:45:44 2023 -0400

    [SPARK-42531][CONNECT] Scala Client Add Collections Functions
    
    ### What changes were proposed in this pull request?
    This PR adds all the collections functions to `functions.scala` for Scala client. This is the last PR large functions PR, there are a few functions missing, these will be added later.
    
    ### Why are the changes needed?
    We want the Scala client to have API parity with the existing API
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it adds functions to the Spark Connect Scala Client.
    
    ### How was this patch tested?
    Added tests to `PlanGenerationTestSuite` and to `ProtoToPlanTestSuite`. I have added a few tests to `ClientE2ETestSuite` for lambda functions (to make sure name scoping works) and the array shuffle function (non-deterministic, hard to test with golden files).
    
    Closes #40130 from hvanhovell/SPARK-42531.
    
    Authored-by: Herman van Hovell <he...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 87e3d5625e76bb734b8dd753bfb25002822c8585)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../scala/org/apache/spark/sql/functions.scala     | 1215 +++++++++++++++++++-
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |   28 +-
 .../org/apache/spark/sql/FunctionTestSuite.scala   |   47 +-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  255 ++++
 .../explain-results/function_aggregate.explain     |    2 +
 .../explain-results/function_array_append.explain  |    2 +
 .../explain-results/function_array_compact.explain |    2 +
 .../function_array_contains.explain                |    2 +
 .../function_array_distinct.explain                |    2 +
 .../explain-results/function_array_except.explain  |    2 +
 .../explain-results/function_array_insert.explain  |    2 +
 .../function_array_intersect.explain               |    2 +
 .../explain-results/function_array_join.explain    |    2 +
 ...nction_array_join_with_null_replacement.explain |    2 +
 .../explain-results/function_array_max.explain     |    2 +
 .../explain-results/function_array_min.explain     |    2 +
 .../function_array_position.explain                |    2 +
 .../explain-results/function_array_remove.explain  |    2 +
 .../explain-results/function_array_repeat.explain  |    2 +
 .../explain-results/function_array_sort.explain    |    2 +
 .../function_array_sort_with_comparator.explain    |    2 +
 .../explain-results/function_array_union.explain   |    2 +
 .../function_arrays_overlap.explain                |    2 +
 .../explain-results/function_arrays_zip.explain    |    2 +
 .../explain-results/function_concat.explain        |    2 +
 .../explain-results/function_element_at.explain    |    2 +
 .../explain-results/function_exists.explain        |    2 +
 .../explain-results/function_explode.explain       |    3 +
 .../explain-results/function_explode_outer.explain |    3 +
 .../explain-results/function_filter.explain        |    2 +
 .../function_filter_with_pair_input.explain        |    2 +
 .../explain-results/function_flatten.explain       |    2 +
 .../explain-results/function_forall.explain        |    2 +
 .../explain-results/function_from_csv.explain      |    2 +
 .../explain-results/function_from_json.explain     |    2 +
 .../explain-results/function_get.explain           |    2 +
 .../function_get_json_object.explain               |    2 +
 .../explain-results/function_inline.explain        |    3 +
 .../explain-results/function_inline_outer.explain  |    3 +
 .../explain-results/function_json_tuple.explain    |    3 +
 .../explain-results/function_map_concat.explain    |    2 +
 .../function_map_contains_key.explain              |    2 +
 .../explain-results/function_map_entries.explain   |    2 +
 .../explain-results/function_map_filter.explain    |    2 +
 .../function_map_from_entries.explain              |    2 +
 .../explain-results/function_map_keys.explain      |    2 +
 .../explain-results/function_map_values.explain    |    2 +
 .../explain-results/function_map_zip_with.explain  |    2 +
 .../explain-results/function_posexplode.explain    |    3 +
 .../function_posexplode_outer.explain              |    3 +
 .../explain-results/function_reverse.explain       |    2 +
 .../explain-results/function_schema_of_csv.explain |    2 +
 .../function_schema_of_json.explain                |    2 +
 .../function_schema_of_json_with_options.explain   |    2 +
 .../explain-results/function_sequence.explain      |    2 +
 .../explain-results/function_size.explain          |    2 +
 .../explain-results/function_slice.explain         |    2 +
 .../explain-results/function_sort_array.explain    |    2 +
 .../explain-results/function_to_csv.explain        |    2 +
 .../explain-results/function_to_json.explain       |    2 +
 .../explain-results/function_transform.explain     |    2 +
 .../function_transform_keys.explain                |    2 +
 .../function_transform_values.explain              |    2 +
 .../function_transform_with_index.explain          |    2 +
 .../explain-results/function_zip_with.explain      |    2 +
 .../query-tests/queries/function_aggregate.json    |   56 +
 .../queries/function_aggregate.proto.bin           |  Bin 0 -> 227 bytes
 .../query-tests/queries/function_array_append.json |   23 +
 .../queries/function_array_append.proto.bin        |    5 +
 .../queries/function_array_compact.json            |   19 +
 .../queries/function_array_compact.proto.bin       |    4 +
 .../queries/function_array_contains.json           |   23 +
 .../queries/function_array_contains.proto.bin      |    5 +
 .../queries/function_array_distinct.json           |   19 +
 .../queries/function_array_distinct.proto.bin      |    4 +
 .../query-tests/queries/function_array_except.json |   36 +
 .../queries/function_array_except.proto.bin        |    8 +
 .../query-tests/queries/function_array_insert.json |   27 +
 .../queries/function_array_insert.proto.bin        |  Bin 0 -> 185 bytes
 .../queries/function_array_intersect.json          |   32 +
 .../queries/function_array_intersect.proto.bin     |    8 +
 .../query-tests/queries/function_array_join.json   |   23 +
 .../queries/function_array_join.proto.bin          |    6 +
 .../function_array_join_with_null_replacement.json |   27 +
 ...tion_array_join_with_null_replacement.proto.bin |    7 +
 .../query-tests/queries/function_array_max.json    |   19 +
 .../queries/function_array_max.proto.bin           |    4 +
 .../query-tests/queries/function_array_min.json    |   19 +
 .../queries/function_array_min.proto.bin           |    4 +
 .../queries/function_array_position.json           |   23 +
 .../queries/function_array_position.proto.bin      |    5 +
 .../query-tests/queries/function_array_remove.json |   23 +
 .../queries/function_array_remove.proto.bin        |    5 +
 .../query-tests/queries/function_array_repeat.json |   23 +
 .../queries/function_array_repeat.proto.bin        |    5 +
 .../query-tests/queries/function_array_sort.json   |   19 +
 .../queries/function_array_sort.proto.bin          |    5 +
 .../function_array_sort_with_comparator.json       |   41 +
 .../function_array_sort_with_comparator.proto.bin  |   11 +
 .../query-tests/queries/function_array_union.json  |   36 +
 .../queries/function_array_union.proto.bin         |    8 +
 .../queries/function_arrays_overlap.json           |   32 +
 .../queries/function_arrays_overlap.proto.bin      |    7 +
 .../query-tests/queries/function_arrays_zip.json   |   36 +
 .../queries/function_arrays_zip.proto.bin          |    9 +
 .../query-tests/queries/function_concat.json       |   49 +
 .../query-tests/queries/function_concat.proto.bin  |   11 +
 .../query-tests/queries/function_element_at.json   |   23 +
 .../queries/function_element_at.proto.bin          |    6 +
 .../query-tests/queries/function_exists.json       |   39 +
 .../query-tests/queries/function_exists.proto.bin  |   10 +
 .../query-tests/queries/function_explode.json      |   19 +
 .../query-tests/queries/function_explode.proto.bin |    4 +
 .../queries/function_explode_outer.json            |   19 +
 .../queries/function_explode_outer.proto.bin       |    4 +
 .../query-tests/queries/function_filter.json       |   39 +
 .../query-tests/queries/function_filter.proto.bin  |   10 +
 .../queries/function_filter_with_pair_input.json   |   59 +
 .../function_filter_with_pair_input.proto.bin      |   15 +
 .../query-tests/queries/function_flatten.json      |   41 +
 .../query-tests/queries/function_flatten.proto.bin |   10 +
 .../query-tests/queries/function_forall.json       |   39 +
 .../query-tests/queries/function_forall.proto.bin  |   10 +
 .../query-tests/queries/function_from_csv.json     |   36 +
 .../queries/function_from_csv.proto.bin            |    9 +
 .../query-tests/queries/function_from_json.json    |   23 +
 .../queries/function_from_json.proto.bin           |    5 +
 .../query-tests/queries/function_get.json          |   23 +
 .../query-tests/queries/function_get.proto.bin     |    5 +
 .../queries/function_get_json_object.json          |   23 +
 .../queries/function_get_json_object.proto.bin     |    5 +
 .../query-tests/queries/function_inline.json       |   24 +
 .../query-tests/queries/function_inline.proto.bin  |    6 +
 .../query-tests/queries/function_inline_outer.json |   24 +
 .../queries/function_inline_outer.proto.bin        |    6 +
 .../query-tests/queries/function_json_tuple.json   |   31 +
 .../queries/function_json_tuple.proto.bin          |    8 +
 .../query-tests/queries/function_map_concat.json   |   60 +
 .../queries/function_map_concat.proto.bin          |   16 +
 .../queries/function_map_contains_key.json         |   23 +
 .../queries/function_map_contains_key.proto.bin    |    5 +
 .../query-tests/queries/function_map_entries.json  |   19 +
 .../queries/function_map_entries.proto.bin         |    4 +
 .../query-tests/queries/function_map_filter.json   |   41 +
 .../queries/function_map_filter.proto.bin          |   11 +
 .../queries/function_map_from_entries.json         |   46 +
 .../queries/function_map_from_entries.proto.bin    |   11 +
 .../query-tests/queries/function_map_keys.json     |   19 +
 .../queries/function_map_keys.proto.bin            |    4 +
 .../query-tests/queries/function_map_values.json   |   19 +
 .../queries/function_map_values.proto.bin          |    5 +
 .../query-tests/queries/function_map_zip_with.json |   65 ++
 .../queries/function_map_zip_with.proto.bin        |   16 +
 .../query-tests/queries/function_posexplode.json   |   19 +
 .../queries/function_posexplode.proto.bin          |    5 +
 .../queries/function_posexplode_outer.json         |   19 +
 .../queries/function_posexplode_outer.proto.bin    |    4 +
 .../query-tests/queries/function_reverse.json      |   19 +
 .../query-tests/queries/function_reverse.proto.bin |    4 +
 .../queries/function_schema_of_csv.json            |   32 +
 .../queries/function_schema_of_csv.proto.bin       |    7 +
 .../queries/function_schema_of_json.json           |   19 +
 .../queries/function_schema_of_json.proto.bin      |    4 +
 .../function_schema_of_json_with_options.json      |   32 +
 .../function_schema_of_json_with_options.proto.bin |    7 +
 .../query-tests/queries/function_sequence.json     |   27 +
 .../queries/function_sequence.proto.bin            |    7 +
 .../query-tests/queries/function_size.json         |   19 +
 .../query-tests/queries/function_size.proto.bin    |    4 +
 .../query-tests/queries/function_slice.json        |   27 +
 .../query-tests/queries/function_slice.proto.bin   |  Bin 0 -> 178 bytes
 .../query-tests/queries/function_sort_array.json   |   23 +
 .../queries/function_sort_array.proto.bin          |    6 +
 .../query-tests/queries/function_to_csv.json       |   32 +
 .../query-tests/queries/function_to_csv.proto.bin  |    7 +
 .../query-tests/queries/function_to_json.json      |   32 +
 .../query-tests/queries/function_to_json.proto.bin |    8 +
 .../query-tests/queries/function_transform.json    |   39 +
 .../queries/function_transform.proto.bin           |    9 +
 .../queries/function_transform_keys.json           |   50 +
 .../queries/function_transform_keys.proto.bin      |   12 +
 .../queries/function_transform_values.json         |   42 +
 .../queries/function_transform_values.proto.bin    |   10 +
 .../queries/function_transform_with_index.json     |   41 +
 .../function_transform_with_index.proto.bin        |   10 +
 .../query-tests/queries/function_zip_with.json     |   45 +
 .../queries/function_zip_with.proto.bin            |   11 +
 187 files changed, 3967 insertions(+), 4 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 0fd35b570f8..dd2380e8bc4 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -18,7 +18,9 @@ package org.apache.spark.sql
 
 import java.math.{BigDecimal => JBigDecimal}
 import java.time.LocalDate
+import java.util.Collections
 
+import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 
 import com.google.protobuf.ByteString
@@ -26,6 +28,8 @@ import com.google.protobuf.ByteString
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.connect.client.unsupported
 import org.apache.spark.sql.expressions.{ScalarUserDefinedFunction, UserDefinedFunction}
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.DataType.parseTypeWithFallback
 
 /**
  * Commonly used functions available for DataFrame operations. Using functions defined here
@@ -981,7 +985,7 @@ object functions {
    * @since 3.4.0
    */
   def lag(e: Column, offset: Int, defaultValue: Any): Column = {
-    lag(e, offset, defaultValue, false)
+    lag(e, offset, defaultValue, ignoreNulls = false)
   }
 
   /**
@@ -1052,7 +1056,7 @@ object functions {
    * @since 3.4.0
    */
   def lead(e: Column, offset: Int, defaultValue: Any): Column = {
-    lead(e, offset, defaultValue, false)
+    lead(e, offset, defaultValue, ignoreNulls = false)
   }
 
   /**
@@ -3845,6 +3849,1213 @@ object functions {
    */
   def timestamp_seconds(e: Column): Column = Column.fn("timestamp_seconds", e)
 
+  //////////////////////////////////////////////////////////////////////////////////////////////
+  // Collection functions
+  //////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns null if the array is null, true if the array contains `value`, and false otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_contains(column: Column, value: Any): Column =
+    Column.fn("array_contains", column, lit(value))
+
+  /**
+   * Returns an ARRAY containing all elements from the source ARRAY as well as the new element.
+   * The new element/column is located at end of the ARRAY.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_append(column: Column, element: Any): Column =
+    Column.fn("array_append", column, lit(element))
+
+  /**
+   * Returns `true` if `a1` and `a2` have at least one non-null element in common. If not and both
+   * the arrays are non-empty and any of them contains a `null`, it returns `null`. It returns
+   * `false` otherwise.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def arrays_overlap(a1: Column, a2: Column): Column = Column.fn("arrays_overlap", a1, a2)
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Int, length: Int): Column =
+    slice(x, lit(start), lit(length))
+
+  /**
+   * Returns an array containing all the elements in `x` from index `start` (or starting from the
+   * end if `start` is negative) with the specified `length`.
+   *
+   * @param x
+   *   the array column to be sliced
+   * @param start
+   *   the starting index
+   * @param length
+   *   the length of the slice
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def slice(x: Column, start: Column, length: Column): Column =
+    Column.fn("slice", x, start, length)
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`. Null values are replaced with
+   * `nullReplacement`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String, nullReplacement: String): Column =
+    Column.fn("array_join", column, lit(delimiter), lit(nullReplacement))
+
+  /**
+   * Concatenates the elements of `column` using the `delimiter`.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_join(column: Column, delimiter: String): Column =
+    Column.fn("array_join", column, lit(delimiter))
+
+  /**
+   * Concatenates multiple input columns together into a single column. The function works with
+   * strings, binary and compatible array columns.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def concat(exprs: Column*): Column = Column.fn("concat", exprs: _*)
+
+  /**
+   * Locates the position of the first occurrence of the value in the given array as long. Returns
+   * null if either of the arguments are null.
+   *
+   * @note
+   *   The position is not zero based, but 1 based index. Returns 0 if value could not be found in
+   *   array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_position(column: Column, value: Any): Column =
+    Column.fn("array_position", column, lit(value))
+
+  /**
+   * Returns element of array at given index in value if column is array. Returns value for the
+   * given key in value if column is map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def element_at(column: Column, value: Any): Column = Column.fn("element_at", column, lit(value))
+
+  /**
+   * Returns element of array at given (0-based) index. If the index points outside of the array
+   * boundaries, then this function returns NULL.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get(column: Column, index: Column): Column = Column.fn("get", column, index)
+
+  /**
+   * Sorts the input array in ascending order. The elements of the input array must be orderable.
+   * NaN is greater than any non-NaN elements for double/float type. Null elements will be placed
+   * at the end of the returned array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column): Column = Column.fn("array_sort", e)
+
+  /**
+   * Sorts the input array based on the given comparator function. The comparator will take two
+   * arguments representing two elements of the array. It returns a negative integer, 0, or a
+   * positive integer as the first element is less than, equal to, or greater than the second
+   * element. If the comparator function returns null, the function will fail and raise an error.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_sort(e: Column, comparator: (Column, Column) => Column): Column =
+    Column.fn("array_sort", e, createLambda(comparator))
+
+  /**
+   * Remove all elements that equal to element from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_remove(column: Column, element: Any): Column =
+    Column.fn("array_remove", column, lit(element))
+
+  /**
+   * Remove all null elements from the given array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_compact(column: Column): Column = Column.fn("array_compact", column)
+
+  /**
+   * Removes duplicate values from the array.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_distinct(e: Column): Column = Column.fn("array_distinct", e)
+
+  /**
+   * Returns an array of the elements in the intersection of the given two arrays, without
+   * duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_intersect(col1: Column, col2: Column): Column =
+    Column.fn("array_intersect", col1, col2)
+
+  /**
+   * Adds an item into a given array at a specified position
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_insert(arr: Column, pos: Column, value: Column): Column =
+    Column.fn("array_insert", arr, pos, value)
+
+  /**
+   * Returns an array of the elements in the union of the given two arrays, without duplicates.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_union(col1: Column, col2: Column): Column =
+    Column.fn("array_union", col1, col2)
+
+  /**
+   * Returns an array of the elements in the first array but not in the second array, without
+   * duplicates. The order of elements in the result is not determined
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_except(col1: Column, col2: Column): Column =
+    Column.fn("array_except", col1, col2)
+
+  private def newLambdaVariable(name: String): proto.Expression.UnresolvedNamedLambdaVariable = {
+    proto.Expression.UnresolvedNamedLambdaVariable
+      .newBuilder()
+      .addNameParts(name)
+      .build()
+  }
+
+  private def toLambdaVariableColumn(
+      v: proto.Expression.UnresolvedNamedLambdaVariable): Column = {
+    Column(_.setUnresolvedNamedLambdaVariable(v))
+  }
+
+  private def createLambda(f: Column => Column): Column = Column { builder =>
+    val x = newLambdaVariable("x")
+    val function = f(toLambdaVariableColumn(x))
+    builder.getLambdaFunctionBuilder
+      .setFunction(function.expr)
+      .addArguments(x)
+  }
+
+  private def createLambda(f: (Column, Column) => Column) = Column { builder =>
+    val x = newLambdaVariable("x")
+    val y = newLambdaVariable("y")
+    val function = f(toLambdaVariableColumn(x), toLambdaVariableColumn(y))
+    builder.getLambdaFunctionBuilder
+      .setFunction(function.expr)
+      .addArguments(x)
+      .addArguments(y)
+  }
+
+  private def createLambda(f: (Column, Column, Column) => Column) = Column { builder =>
+    val x = newLambdaVariable("x")
+    val y = newLambdaVariable("y")
+    val z = newLambdaVariable("z")
+    val function =
+      f(toLambdaVariableColumn(x), toLambdaVariableColumn(y), toLambdaVariableColumn(z))
+    builder.getLambdaFunctionBuilder
+      .setFunction(function.expr)
+      .addArguments(x)
+      .addArguments(y)
+      .addArguments(z)
+  }
+
+  /**
+   * Returns an array of elements after applying a transformation to each element in the input
+   * array.
+   * {{{
+   *   df.select(transform(col("i"), x => x + 1))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   col => transformed_col, the lambda function to transform the input column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def transform(column: Column, f: Column => Column): Column =
+    Column.fn("transform", column, createLambda(f))
+
+  /**
+   * Returns an array of elements after applying a transformation to each element in the input
+   * array.
+   * {{{
+   *   df.select(transform(col("i"), (x, i) => x + i))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   (col, index) => transformed_col, the lambda function to filter the input column given the
+   *   index. Indices start at 0.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def transform(column: Column, f: (Column, Column) => Column): Column =
+    Column.fn("transform", column, createLambda(f))
+
+  /**
+   * Returns whether a predicate holds for one or more elements in the array.
+   * {{{
+   *   df.select(exists(col("i"), _ % 2 === 0))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   col => predicate, the Boolean predicate to check the input column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def exists(column: Column, f: Column => Column): Column =
+    Column.fn("exists", column, createLambda(f))
+
+  /**
+   * Returns whether a predicate holds for every element in the array.
+   * {{{
+   *   df.select(forall(col("i"), x => x % 2 === 0))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   col => predicate, the Boolean predicate to check the input column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def forall(column: Column, f: Column => Column): Column =
+    Column.fn("forall", column, createLambda(f))
+
+  /**
+   * Returns an array of elements for which a predicate holds in a given array.
+   * {{{
+   *   df.select(filter(col("s"), x => x % 2 === 0))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   col => predicate, the Boolean predicate to filter the input column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def filter(column: Column, f: Column => Column): Column =
+    Column.fn("filter", column, createLambda(f))
+
+  /**
+   * Returns an array of elements for which a predicate holds in a given array.
+   * {{{
+   *   df.select(filter(col("s"), (x, i) => i % 2 === 0))
+   * }}}
+   *
+   * @param column
+   *   the input array column
+   * @param f
+   *   (col, index) => predicate, the Boolean predicate to filter the input column given the
+   *   index. Indices start at 0.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def filter(column: Column, f: (Column, Column) => Column): Column =
+    Column.fn("filter", column, createLambda(f))
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the array, and reduces this
+   * to a single state. The final state is converted into the final result by applying a finish
+   * function.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x, _ * 10))
+   * }}}
+   *
+   * @param expr
+   *   the input array column
+   * @param initialValue
+   *   the initial value
+   * @param merge
+   *   (combined_value, input_value) => combined_value, the merge function to merge an input value
+   *   to the combined_value
+   * @param finish
+   *   combined_value => final_value, the lambda function to convert the combined value of all
+   *   inputs to final result
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def aggregate(
+      expr: Column,
+      initialValue: Column,
+      merge: (Column, Column) => Column,
+      finish: Column => Column): Column =
+    Column.fn("aggregate", expr, initialValue, createLambda(merge), createLambda(finish))
+
+  /**
+   * Applies a binary operator to an initial state and all elements in the array, and reduces this
+   * to a single state.
+   * {{{
+   *   df.select(aggregate(col("i"), lit(0), (acc, x) => acc + x))
+   * }}}
+   *
+   * @param expr
+   *   the input array column
+   * @param initialValue
+   *   the initial value
+   * @param merge
+   *   (combined_value, input_value) => combined_value, the merge function to merge an input value
+   *   to the combined_value
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def aggregate(expr: Column, initialValue: Column, merge: (Column, Column) => Column): Column =
+    aggregate(expr, initialValue, merge, c => c)
+
+  /**
+   * Merge two given arrays, element-wise, into a single array using a function. If one array is
+   * shorter, nulls are appended at the end to match the length of the longer array, before
+   * applying the function.
+   * {{{
+   *   df.select(zip_with(df1("val1"), df1("val2"), (x, y) => x + y))
+   * }}}
+   *
+   * @param left
+   *   the left input array column
+   * @param right
+   *   the right input array column
+   * @param f
+   *   (lCol, rCol) => col, the lambda function to merge two input columns into one column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def zip_with(left: Column, right: Column, f: (Column, Column) => Column): Column =
+    Column.fn("zip_with", left, right, createLambda(f))
+
+  /**
+   * Applies a function to every key-value pair in a map and returns a map with the results of
+   * those applications as the new keys for the pairs.
+   * {{{
+   *   df.select(transform_keys(col("i"), (k, v) => k + v))
+   * }}}
+   *
+   * @param expr
+   *   the input map column
+   * @param f
+   *   (key, value) => new_key, the lambda function to transform the key of input map column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def transform_keys(expr: Column, f: (Column, Column) => Column): Column =
+    Column.fn("transform_keys", expr, createLambda(f))
+
+  /**
+   * Applies a function to every key-value pair in a map and returns a map with the results of
+   * those applications as the new values for the pairs.
+   * {{{
+   *   df.select(transform_values(col("i"), (k, v) => k + v))
+   * }}}
+   *
+   * @param expr
+   *   the input map column
+   * @param f
+   *   (key, value) => new_value, the lambda function to transform the value of input map column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def transform_values(expr: Column, f: (Column, Column) => Column): Column =
+    Column.fn("transform_values", expr, createLambda(f))
+
+  /**
+   * Returns a map whose key-value pairs satisfy a predicate.
+   * {{{
+   *   df.select(map_filter(col("m"), (k, v) => k * 10 === v))
+   * }}}
+   *
+   * @param expr
+   *   the input map column
+   * @param f
+   *   (key, value) => predicate, the Boolean predicate to filter the input map column
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_filter(expr: Column, f: (Column, Column) => Column): Column =
+    Column.fn("map_filter", expr, createLambda(f))
+
+  /**
+   * Merge two given maps, key-wise into a single map using a function.
+   * {{{
+   *   df.select(map_zip_with(df("m1"), df("m2"), (k, v1, v2) => k === v1 + v2))
+   * }}}
+   *
+   * @param left
+   *   the left input map column
+   * @param right
+   *   the right input map column
+   * @param f
+   *   (key, value1, value2) => new_value, the lambda function to merge the map values
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_zip_with(left: Column, right: Column, f: (Column, Column, Column) => Column): Column =
+    Column.fn("map_zip_with", left, right, createLambda(f))
+
+  /**
+   * Creates a new row for each element in the given array or map column. Uses the default column
+   * name `col` for elements in the array and `key` and `value` for elements in the map unless
+   * specified otherwise.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def explode(e: Column): Column = Column.fn("explode", e)
+
+  /**
+   * Creates a new row for each element in the given array or map column. Uses the default column
+   * name `col` for elements in the array and `key` and `value` for elements in the map unless
+   * specified otherwise. Unlike explode, if the array/map is null or empty then null is produced.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def explode_outer(e: Column): Column = Column.fn("explode_outer", e)
+
+  /**
+   * Creates a new row for each element with position in the given array or map column. Uses the
+   * default column name `pos` for position, and `col` for elements in the array and `key` and
+   * `value` for elements in the map unless specified otherwise.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def posexplode(e: Column): Column = Column.fn("posexplode", e)
+
+  /**
+   * Creates a new row for each element with position in the given array or map column. Uses the
+   * default column name `pos` for position, and `col` for elements in the array and `key` and
+   * `value` for elements in the map unless specified otherwise. Unlike posexplode, if the
+   * array/map is null or empty then the row (null, null) is produced.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def posexplode_outer(e: Column): Column = Column.fn("posexplode_outer", e)
+
+  /**
+   * Creates a new row for each element in the given array of structs.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def inline(e: Column): Column = Column.fn("inline", e)
+
+  /**
+   * Creates a new row for each element in the given array of structs. Unlike inline, if the array
+   * is null or empty then null is produced for each nested column.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def inline_outer(e: Column): Column = Column.fn("inline_outer", e)
+
+  /**
+   * Extracts json object from a json string based on json path specified, and returns json string
+   * of the extracted json object. It will return null if the input json string is invalid.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def get_json_object(e: Column, path: String): Column =
+    Column.fn("get_json_object", e, lit(path))
+
+  /**
+   * Creates a new row for a json column according to the given field names.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def json_tuple(json: Column, fields: String*): Column = {
+    require(fields.nonEmpty, "at least 1 field name should be given.")
+    Column.fn("json_tuple", json +: fields.map(lit): _*)
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   * @param options
+   *   options to control how the json is parsed. Accepts the same options as the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_json(e, schema.asInstanceOf[DataType], options)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` with the specified schema. Returns `null`, in the
+   * case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = {
+    from_json(e, lit(schema.json), options.iterator)
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+    from_json(e, schema, options.asScala.toMap)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` with the specified schema. Returns `null`, in the
+   * case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: DataType, options: java.util.Map[String, String]): Column = {
+    from_json(e, schema, options.asScala.toMap)
+  }
+
+  /**
+   * Parses a column containing a JSON string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def from_json(e: Column, schema: StructType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  /**
+   * Parses a column containing a JSON string into a `MapType` with `StringType` as keys type,
+   * `StructType` or `ArrayType` with the specified schema. Returns `null`, in the case of an
+   * unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def from_json(e: Column, schema: DataType): Column =
+    from_json(e, schema, Map.empty[String, String])
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` with the specified schema. Returns `null`, in the
+   * case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema as a DDL-formatted string.
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = {
+    from_json(e, schema, options.asScala.toMap)
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` with the specified schema. Returns `null`, in the
+   * case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema as a DDL-formatted string.
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: String, options: Map[String, String]): Column = {
+    val dataType =
+      parseTypeWithFallback(schema, DataType.fromJson, fallbackParser = DataType.fromDDL)
+    from_json(e, dataType, options)
+  }
+
+  /**
+   * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. Returns
+   * `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def from_json(e: Column, schema: Column): Column = {
+    from_json(e, schema, Iterator.empty)
+  }
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
+   * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. Returns
+   * `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing JSON data.
+   * @param schema
+   *   the schema to use when parsing the json string
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_json(e: Column, schema: Column, options: java.util.Map[String, String]): Column = {
+    from_json(e, schema, options.asScala.iterator)
+  }
+
+  /**
+   * Invoke a function with an options map as its last argument. If there are no options, its
+   * column is dropped.
+   */
+  private def fnWithOptions(
+      name: String,
+      options: Iterator[(String, String)],
+      arguments: Column*): Column = {
+    val augmentedArguments = if (options.hasNext) {
+      val flattenedKeyValueIterator = options.flatMap { case (k, v) =>
+        Iterator(lit(k), lit(v))
+      }
+      arguments :+ map(flattenedKeyValueIterator.toSeq: _*)
+    } else {
+      arguments
+    }
+    Column.fn(name, augmentedArguments: _*)
+  }
+
+  private def from_json(
+      e: Column,
+      schema: Column,
+      options: Iterator[(String, String)]): Column = {
+    fnWithOptions("from_json", options, e, schema)
+  }
+
+  /**
+   * Parses a JSON string and infers its schema in DDL format.
+   *
+   * @param json
+   *   a JSON string.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def schema_of_json(json: String): Column = schema_of_json(lit(json))
+
+  /**
+   * Parses a JSON string and infers its schema in DDL format.
+   *
+   * @param json
+   *   a foldable string column containing a JSON string.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def schema_of_json(json: Column): Column = Column.fn("schema_of_json", json)
+
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a JSON string and infers its schema in DDL format using options.
+   *
+   * @param json
+   *   a foldable string column containing JSON data.
+   * @param options
+   *   options to control how the json is parsed. accepts the same options and the json data
+   *   source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @return
+   *   a column with string literal containing schema in DDL format.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_json(json: Column, options: java.util.Map[String, String]): Column =
+    fnWithOptions("schema_of_json", options.asScala.iterator, json)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or a `MapType` into
+   * a JSON string with the specified schema. Throws an exception, in the case of an unsupported
+   * type.
+   *
+   * @param e
+   *   a column containing a struct, an array or a map.
+   * @param options
+   *   options to control how the struct column is converted into a json string. accepts the same
+   *   options and the json data source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use. Additionally the function supports the `pretty`
+   *   option which enables pretty JSON generation.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def to_json(e: Column, options: Map[String, String]): Column =
+    fnWithOptions("to_json", options.iterator, e)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Converts a column containing a `StructType`, `ArrayType` or a `MapType` into
+   * a JSON string with the specified schema. Throws an exception, in the case of an unsupported
+   * type.
+   *
+   * @param e
+   *   a column containing a struct, an array or a map.
+   * @param options
+   *   options to control how the struct column is converted into a json string. accepts the same
+   *   options and the json data source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option"> Data
+   *   Source Option</a> in the version you use. Additionally the function supports the `pretty`
+   *   option which enables pretty JSON generation.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def to_json(e: Column, options: java.util.Map[String, String]): Column =
+    to_json(e, options.asScala.toMap)
+
+  /**
+   * Converts a column containing a `StructType`, `ArrayType` or a `MapType` into a JSON string
+   * with the specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e
+   *   a column containing a struct, an array or a map.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def to_json(e: Column): Column =
+    to_json(e, Map.empty[String, String])
+
+  /**
+   * Returns length of array or map.
+   *
+   * The function returns null for null input if spark.sql.legacy.sizeOfNull is set to false or
+   * spark.sql.ansi.enabled is set to true. Otherwise, the function returns -1 for null input.
+   * With the default settings, the function returns -1 for null input.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def size(e: Column): Column = Column.fn("size", e)
+
+  /**
+   * Sorts the input array for the given column in ascending order, according to the natural
+   * ordering of the array elements. Null elements will be placed at the beginning of the returned
+   * array.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def sort_array(e: Column): Column = sort_array(e, asc = true)
+
+  /**
+   * Sorts the input array for the given column in ascending or descending order, according to the
+   * natural ordering of the array elements. NaN is greater than any non-NaN elements for
+   * double/float type. Null elements will be placed at the beginning of the returned array in
+   * ascending order or at the end of the returned array in descending order.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def sort_array(e: Column, asc: Boolean): Column = Column.fn("sort_array", e, lit(asc))
+
+  /**
+   * Returns the minimum value in the array. NaN is greater than any non-NaN elements for
+   * double/float type. NULL elements are skipped.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_min(e: Column): Column = Column.fn("array_min", e)
+
+  /**
+   * Returns the maximum value in the array. NaN is greater than any non-NaN elements for
+   * double/float type. NULL elements are skipped.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_max(e: Column): Column = Column.fn("array_max", e)
+
+  /**
+   * Returns a random permutation of the given array.
+   *
+   * @note
+   *   The function is non-deterministic.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def shuffle(e: Column): Column = Column.fn("shuffle", e)
+
+  /**
+   * Returns a reversed string or an array with reverse order of elements.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def reverse(e: Column): Column = Column.fn("reverse", e)
+
+  /**
+   * Creates a single array from an array of arrays. If a structure of nested arrays is deeper
+   * than two levels, only one level of nesting is removed.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def flatten(e: Column): Column = Column.fn("flatten", e)
+
+  /**
+   * Generate a sequence of integers from start to stop, incrementing by step.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def sequence(start: Column, stop: Column, step: Column): Column =
+    Column.fn("sequence", start, stop, step)
+
+  /**
+   * Generate a sequence of integers from start to stop, incrementing by 1 if start is less than
+   * or equal to stop, otherwise -1.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def sequence(start: Column, stop: Column): Column = sequence(start, stop, lit(1L))
+
+  /**
+   * Creates an array containing the left argument repeated the number of times given by the right
+   * argument.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_repeat(left: Column, right: Column): Column = Column.fn("array_repeat", left, right)
+
+  /**
+   * Creates an array containing the left argument repeated the number of times given by the right
+   * argument.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def array_repeat(e: Column, count: Int): Column = array_repeat(e, lit(count))
+
+  /**
+   * Returns true if the map contains the key.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_contains_key(column: Column, key: Any): Column =
+    Column.fn("map_contains_key", column, lit(key))
+
+  /**
+   * Returns an unordered array containing the keys of the map.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_keys(e: Column): Column = Column.fn("map_keys", e)
+
+  /**
+   * Returns an unordered array containing the values of the map.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_values(e: Column): Column = Column.fn("map_values", e)
+
+  /**
+   * Returns an unordered array of all entries in the given map.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_entries(e: Column): Column = Column.fn("map_entries", e)
+
+  /**
+   * Returns a map created from the given array of entries.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def map_from_entries(e: Column): Column = Column.fn("map_from_entries", e)
+
+  /**
+   * Returns a merged array of structs in which the N-th struct contains all N-th values of input
+   * arrays.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def arrays_zip(e: Column*): Column = Column.fn("arrays_zip", e: _*)
+
+  /**
+   * Returns the union of all the given maps.
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def map_concat(cols: Column*): Column = Column.fn("map_concat", cols: _*)
+
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a column containing a CSV string into a `StructType` with the specified schema.
+   * Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing CSV data.
+   * @param schema
+   *   the schema to use when parsing the CSV string
+   * @param options
+   *   options to control how the CSV is parsed. accepts the same options and the CSV data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_csv(e: Column, schema: StructType, options: Map[String, String]): Column =
+    from_csv(e, lit(schema.toDDL), options.iterator)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Parses a column containing a CSV string into a `StructType` with the
+   * specified schema. Returns `null`, in the case of an unparseable string.
+   *
+   * @param e
+   *   a string column containing CSV data.
+   * @param schema
+   *   the schema to use when parsing the CSV string
+   * @param options
+   *   options to control how the CSV is parsed. accepts the same options and the CSV data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def from_csv(e: Column, schema: Column, options: java.util.Map[String, String]): Column =
+    from_csv(e, schema, options.asScala.iterator)
+
+  private def from_csv(e: Column, schema: Column, options: Iterator[(String, String)]): Column =
+    fnWithOptions("from_csv", options, e, schema)
+
+  /**
+   * Parses a CSV string and infers its schema in DDL format.
+   *
+   * @param csv
+   *   a CSV string.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def schema_of_csv(csv: String): Column = schema_of_csv(lit(csv))
+
+  /**
+   * Parses a CSV string and infers its schema in DDL format.
+   *
+   * @param csv
+   *   a foldable string column containing a CSV string.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def schema_of_csv(csv: Column): Column = schema_of_csv(csv, Collections.emptyMap())
+
+  // scalastyle:off line.size.limit
+  /**
+   * Parses a CSV string and infers its schema in DDL format using options.
+   *
+   * @param csv
+   *   a foldable string column containing a CSV string.
+   * @param options
+   *   options to control how the CSV is parsed. accepts the same options and the CSV data source.
+   *   See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   * @return
+   *   a column with string literal containing schema in DDL format.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def schema_of_csv(csv: Column, options: java.util.Map[String, String]): Column =
+    fnWithOptions("schema_of_csv", options.asScala.iterator, csv)
+
+  // scalastyle:off line.size.limit
+  /**
+   * (Java-specific) Converts a column containing a `StructType` into a CSV string with the
+   * specified schema. Throws an exception, in the case of an unsupported type.
+   *
+   * @param e
+   *   a column containing a struct.
+   * @param options
+   *   options to control how the struct column is converted into a CSV string. It accepts the
+   *   same options and the CSV data source. See <a href=
+   *   "https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option"> Data
+   *   Source Option</a> in the version you use.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  // scalastyle:on line.size.limit
+  def to_csv(e: Column, options: java.util.Map[String, String]): Column =
+    fnWithOptions("to_csv", options.asScala.iterator, e)
+
+  /**
+   * Converts a column containing a `StructType` into a CSV string with the specified schema.
+   * Throws an exception, in the case of an unsupported type.
+   *
+   * @param e
+   *   a column containing a struct.
+   *
+   * @group collection_funcs
+   * @since 3.4.0
+   */
+  def to_csv(e: Column): Column = to_csv(e, Collections.emptyMap())
+
   //////////////////////////////////////////////////////////////////////////////////////////////
   // Partition Transforms functions
   //////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index e5d426e80f9..c9d9f43cbaf 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -27,7 +27,7 @@ import org.apache.commons.io.output.TeeOutputStream
 import org.scalactic.TolerantNumerics
 
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}
-import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.functions.{aggregate, array, col, lit, sequence, shuffle, transform, udf}
 import org.apache.spark.sql.types._
 
 class ClientE2ETestSuite extends RemoteSparkSession {
@@ -373,4 +373,30 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
     checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
   }
+
+  test("lambda functions") {
+    // This test is mostly to validate lambda variables are properly resolved.
+    val result = spark
+      .range(3)
+      .select(
+        col("id"),
+        array(sequence(col("id"), lit(10)), sequence(col("id") * 2, lit(10))).as("data"))
+      .select(col("id"), transform(col("data"), x => transform(x, x => x + 1)).as("data"))
+      .select(
+        col("id"),
+        transform(col("data"), x => aggregate(x, lit(0L), (x, y) => x + y)).as("summaries"))
+      .collect()
+    val expected = Array(Row(0L, Seq(66L, 66L)), Row(1L, Seq(65L, 63L)), Row(2L, Seq(63L, 56L)))
+    assert(result === expected)
+  }
+
+  test("shuffle array") {
+    // We cannot do structural tests for shuffle because its random seed will always change.
+    val result = spark
+      .sql("select 1")
+      .select(shuffle(array(lit(1), lit(2), lit(3), lit(74))))
+      .head()
+      .getSeq[Int](0)
+    assert(result.toSet === Set(1, 2, 3, 74))
+  }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
index f9118b93ec5..b9fba1eaa8c 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/FunctionTestSuite.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.sql
 
+import java.util.Collections
+
 import org.scalatest.funsuite.{AnyFunSuite => ConnectFunSuite} // scalastyle:ignore funsuite
 
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DataType, StructType}
 
 /**
  * Tests for client local function behavior.
@@ -38,6 +41,10 @@ class FunctionTestSuite extends ConnectFunSuite {
   private val b = col("b")
   private val c = col("c")
 
+  private val schema = new StructType()
+    .add("key", "long")
+    .add("value", "string")
+
   testEquals("col/column", a, column("a"))
   testEquals("asc/asc_nulls_first", asc("a"), asc_nulls_first("a"))
   testEquals("desc/desc_nulls_last", desc("a"), desc_nulls_last("a"))
@@ -149,7 +156,6 @@ class FunctionTestSuite extends ConnectFunSuite {
   testEquals("tanh", tanh("a"), tanh(a))
   testEquals("degrees", toDegrees(a), toDegrees("a"), degrees(a), degrees("a"))
   testEquals("radians", toRadians(a), toRadians("a"), radians(a), radians("a"))
-
   testEquals(
     "regexp_replace",
     regexp_replace(a, lit("foo"), lit("bar")),
@@ -170,6 +176,7 @@ class FunctionTestSuite extends ConnectFunSuite {
     window(a, "10 seconds", "10 seconds"),
     window(a, "10 seconds"))
   testEquals("session_window", session_window(a, "1 second"), session_window(a, lit("1 second")))
+  testEquals("slice", slice(a, 1, 2), slice(a, lit(1), lit(2)))
   testEquals("bucket", bucket(lit(3), a), bucket(3, a))
   testEquals(
     "lag",
@@ -185,6 +192,40 @@ class FunctionTestSuite extends ConnectFunSuite {
     lead(a, 2, null),
     lead("a", 2, null),
     lead(a, 2, null, false))
+  testEquals(
+    "aggregate",
+    aggregate(a, lit(0), (l, r) => l + r),
+    aggregate(a, lit(0), (l, r) => l + r, id => id))
+  testEquals(
+    "from_json",
+    from_json(a, schema.asInstanceOf[DataType]),
+    from_json(a, schema),
+    from_json(a, lit(schema.json)),
+    from_json(a, schema.json, Map.empty[String, String]),
+    from_json(a, schema.json, Collections.emptyMap[String, String]),
+    from_json(a, schema.asInstanceOf[DataType], Map.empty[String, String]),
+    from_json(a, schema.asInstanceOf[DataType], Collections.emptyMap[String, String]),
+    from_json(a, schema, Map.empty[String, String]),
+    from_json(a, schema, Collections.emptyMap[String, String]),
+    from_json(a, lit(schema.json), Collections.emptyMap[String, String]))
+  testEquals("schema_of_json", schema_of_json(lit("x,y")), schema_of_json("x,y"))
+  testEquals(
+    "to_json",
+    to_json(a),
+    to_json(a, Collections.emptyMap[String, String]),
+    to_json(a, Map.empty[String, String]))
+  testEquals("sort_array", sort_array(a), sort_array(a, asc = true))
+  testEquals("sequence", sequence(lit(1), lit(10)), sequence(lit(1), lit(10), lit(1L)))
+  testEquals(
+    "from_csv",
+    from_csv(a, lit(schema.toDDL), Collections.emptyMap[String, String]),
+    from_csv(a, schema, Map.empty[String, String]))
+  testEquals(
+    "schema_of_csv",
+    schema_of_csv(lit("x,y")),
+    schema_of_csv("x,y"),
+    schema_of_csv(lit("x,y"), Collections.emptyMap()))
+  testEquals("to_csv", to_csv(a), to_csv(a, Collections.emptyMap[String, String]))
 
   test("assert_true no message") {
     val e = assert_true(a).expr
@@ -195,6 +236,10 @@ class FunctionTestSuite extends ConnectFunSuite {
     assert(fn.getArguments(0) == a.expr)
   }
 
+  test("json_tuple zero args") {
+    intercept[IllegalArgumentException](json_tuple(a))
+  }
+
   test("rand no seed") {
     val e = rand().expr
     assert(e.hasUnresolvedFunction)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 9ca91942567..9d4ed0f912f 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql
 
 import java.nio.file.{Files, Path}
+import java.util.Collections
 
 import scala.collection.mutable
 import scala.util.{Failure, Success, Try}
@@ -1607,6 +1608,260 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
     fn.timestamp_seconds(fn.col("x"))
   }
 
+  // Array of Long
+  // Array of Long
+  // Array of Array of Long
+  // Map string, Long
+  // Map string, Long
+
+  functionTest("array_contains") {
+    fn.array_contains(fn.col("e"), lit(1))
+  }
+
+  functionTest("array_append") {
+    fn.array_append(fn.col("e"), lit(1))
+  }
+
+  functionTest("arrays_overlap") {
+    fn.arrays_overlap(fn.col("e"), fn.array(lit(1), lit(2)))
+  }
+
+  functionTest("slice") {
+    fn.slice(fn.col("e"), 0, 5)
+  }
+
+  functionTest("array_join") {
+    fn.array_join(fn.col("e"), ";")
+  }
+
+  functionTest("array_join with null replacement") {
+    fn.array_join(fn.col("e"), ";", "null")
+  }
+
+  functionTest("concat") {
+    fn.concat(fn.col("e"), fn.array(lit(1), lit(2)), fn.sequence(lit(33), lit(40)))
+  }
+
+  functionTest("array_position") {
+    fn.array_position(fn.col("e"), 10)
+  }
+
+  functionTest("element_at") {
+    fn.element_at(fn.col("f"), "bob")
+  }
+
+  functionTest("get") {
+    fn.get(fn.col("e"), lit(2))
+  }
+
+  functionTest("array_sort") {
+    fn.array_sort(fn.col("e"))
+  }
+
+  functionTest("array_sort with comparator") {
+    fn.array_sort(fn.col("e"), (l, r) => l - r)
+  }
+
+  functionTest("array_remove") {
+    fn.array_remove(fn.col("e"), 314)
+  }
+
+  functionTest("array_compact") {
+    fn.array_compact(fn.col("e"))
+  }
+
+  functionTest("array_distinct") {
+    fn.array_distinct(fn.col("e"))
+  }
+
+  functionTest("array_intersect") {
+    fn.array_intersect(fn.col("e"), fn.array(lit(10), lit(4)))
+  }
+
+  functionTest("array_insert") {
+    fn.array_insert(fn.col("e"), lit(0), lit(1))
+  }
+
+  functionTest("array_union") {
+    fn.array_union(fn.col("e"), fn.array(lit(1), lit(2), lit(3)))
+  }
+
+  functionTest("array_except") {
+    fn.array_except(fn.col("e"), fn.array(lit(1), lit(2), lit(4)))
+  }
+
+  functionTest("transform") {
+    fn.transform(fn.col("e"), x => x + 1)
+  }
+
+  functionTest("transform with index") {
+    fn.transform(fn.col("e"), (x, i) => x + i)
+  }
+
+  functionTest("exists") {
+    fn.exists(fn.col("e"), x => x > 10)
+  }
+
+  functionTest("forall") {
+    fn.forall(fn.col("e"), x => x > 10)
+  }
+
+  functionTest("filter") {
+    fn.filter(fn.col("e"), x => x > 10)
+  }
+
+  functionTest("filter with pair input") {
+    fn.filter(fn.col("e"), (x, i) => x > 10 && i > 2)
+  }
+
+  functionTest("aggregate") {
+    fn.aggregate(fn.col("e"), lit(0), (x, y) => x + y)
+  }
+
+  functionTest("zip_with") {
+    fn.zip_with(fn.col("e"), fn.col("e"), (x, y) => x + y)
+  }
+
+  functionTest("transform_keys") {
+    fn.transform_keys(fn.col("f"), (k, v) => fn.concat(k, v.getItem("id")))
+  }
+
+  functionTest("transform_values") {
+    fn.transform_values(fn.col("f"), (k, v) => v.withField("key", k))
+  }
+
+  functionTest("map_filter") {
+    fn.map_filter(fn.col("f"), (k, _) => k.contains(lit("baz")))
+  }
+
+  functionTest("map_zip_with") {
+    fn.map_zip_with(fn.col("f"), fn.col("f"), (_, v1, v2) => v1.getItem("id") + v2.getItem("id"))
+  }
+
+  functionTest("explode") {
+    fn.explode(fn.col("e"))
+  }
+
+  functionTest("explode_outer") {
+    fn.explode_outer(fn.col("e"))
+  }
+
+  functionTest("posexplode") {
+    fn.posexplode(fn.col("e"))
+  }
+
+  functionTest("posexplode_outer") {
+    fn.posexplode_outer(fn.col("e"))
+  }
+
+  functionTest("inline") {
+    fn.inline(fn.map_values(fn.col("f")))
+  }
+
+  functionTest("inline_outer") {
+    fn.inline_outer(fn.map_values(fn.col("f")))
+  }
+
+  functionTest("get_json_object") {
+    fn.get_json_object(fn.col("g"), "$.device_type")
+  }
+
+  functionTest("json_tuple") {
+    fn.json_tuple(fn.col("g"), "a", "b", "id")
+  }
+
+  functionTest("from_json") {
+    fn.from_json(fn.col("g"), simpleSchema)
+  }
+
+  functionTest("schema_of_json") {
+    fn.schema_of_json(lit("""[{"col":01}]"""))
+  }
+
+  functionTest("schema_of_json with options") {
+    fn.schema_of_json(
+      lit("""[{"col":01}]"""),
+      Collections.singletonMap("allowNumericLeadingZeros", "true"))
+  }
+
+  functionTest("to_json") {
+    fn.to_json(fn.col("d"), Map(("timestampFormat", "dd/MM/yyyy")))
+  }
+
+  functionTest("size") {
+    fn.size(fn.col("f"))
+  }
+
+  functionTest("sort_array") {
+    fn.sort_array(fn.col("e"))
+  }
+
+  functionTest("array_min") {
+    fn.array_min(fn.col("e"))
+  }
+
+  functionTest("array_max") {
+    fn.array_max(fn.col("e"))
+  }
+
+  functionTest("reverse") {
+    fn.reverse(fn.col("e"))
+  }
+
+  functionTest("flatten") {
+    fn.flatten(fn.array(fn.col("e"), fn.sequence(fn.lit(1), fn.lit(10))))
+  }
+
+  functionTest("sequence") {
+    fn.sequence(fn.lit(1), fn.lit(10))
+  }
+
+  functionTest("array_repeat") {
+    fn.array_repeat(fn.col("a"), 10)
+  }
+
+  functionTest("map_contains_key") {
+    fn.map_contains_key(fn.col("f"), "xyz")
+  }
+
+  functionTest("map_keys") {
+    fn.map_keys(fn.col("f"))
+  }
+
+  functionTest("map_values") {
+    fn.map_values(fn.col("f"))
+  }
+
+  functionTest("map_entries") {
+    fn.map_entries(fn.col("f"))
+  }
+
+  functionTest("map_from_entries") {
+    fn.map_from_entries(fn.transform(fn.col("e"), (x, i) => fn.struct(i, x)))
+  }
+
+  functionTest("arrays_zip") {
+    fn.arrays_zip(fn.col("e"), fn.sequence(lit(1), lit(20)))
+  }
+
+  functionTest("map_concat") {
+    fn.map_concat(
+      fn.col("f"),
+      fn.map(lit("foo"), fn.struct(lit(12L).as("id"), lit(68).as("a"), lit(Math.E).as("b"))))
+  }
+
+  functionTest("from_csv") {
+    fn.from_csv(fn.col("g"), simpleSchema, Map(("mode", "FAILFAST")))
+  }
+
+  functionTest("schema_of_csv") {
+    fn.schema_of_csv(lit("1|abc"), Collections.singletonMap("sep", "|"))
+  }
+
+  functionTest("to_csv") {
+    fn.to_csv(fn.col("d"), Collections.singletonMap("sep", "|"))
+  }
+
   test("groupby agg") {
     simple
       .groupBy(Column("id"))
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_aggregate.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aggregate.explain
new file mode 100644
index 00000000000..31fe84066f8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_aggregate.explain
@@ -0,0 +1,2 @@
+Project [aggregate(e#0, 0, lambdafunction((lambda x#0 + lambda y#0), lambda x#0, lambda y#0, false), lambdafunction(lambda x#0, lambda x#0, false)) AS aggregate(e, 0, lambdafunction((namedlambdavariable() + namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_append.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_append.explain
new file mode 100644
index 00000000000..ca2804ebb60
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_append.explain
@@ -0,0 +1,2 @@
+Project [array_append(e#0, 1) AS array_append(e, 1)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_compact.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_compact.explain
new file mode 100644
index 00000000000..a78195c4ae2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_compact.explain
@@ -0,0 +1,2 @@
+Project [filter(e#0, lambdafunction(isnotnull(lambda arg#0), lambda arg#0, false)) AS array_compact(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_contains.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_contains.explain
new file mode 100644
index 00000000000..ecfd647863b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_contains.explain
@@ -0,0 +1,2 @@
+Project [array_contains(e#0, 1) AS array_contains(e, 1)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_distinct.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_distinct.explain
new file mode 100644
index 00000000000..efe98a93b01
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_distinct.explain
@@ -0,0 +1,2 @@
+Project [array_distinct(e#0) AS array_distinct(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_except.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_except.explain
new file mode 100644
index 00000000000..5b667f60cb5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_except.explain
@@ -0,0 +1,2 @@
+Project [array_except(e#0, array(1, 2, 4)) AS array_except(e, array(1, 2, 4))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_insert.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_insert.explain
new file mode 100644
index 00000000000..edcd790596b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_insert.explain
@@ -0,0 +1,2 @@
+Project [array_insert(e#0, 0, 1) AS array_insert(e, 0, 1)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_intersect.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_intersect.explain
new file mode 100644
index 00000000000..db862ee9697
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_intersect.explain
@@ -0,0 +1,2 @@
+Project [array_intersect(e#0, array(10, 4)) AS array_intersect(e, array(10, 4))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join.explain
new file mode 100644
index 00000000000..993bb6b8207
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join.explain
@@ -0,0 +1,2 @@
+Project [array_join(cast(e#0 as array<string>), ;, None) AS array_join(e, ;)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join_with_null_replacement.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join_with_null_replacement.explain
new file mode 100644
index 00000000000..0a93be00416
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_join_with_null_replacement.explain
@@ -0,0 +1,2 @@
+Project [array_join(cast(e#0 as array<string>), ;, Some(null)) AS array_join(e, ;, null)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_max.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_max.explain
new file mode 100644
index 00000000000..76a12cb50c5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_max.explain
@@ -0,0 +1,2 @@
+Project [array_max(e#0) AS array_max(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_min.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_min.explain
new file mode 100644
index 00000000000..e11dfe2e471
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_min.explain
@@ -0,0 +1,2 @@
+Project [array_min(e#0) AS array_min(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_position.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_position.explain
new file mode 100644
index 00000000000..cd3ca8313c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_position.explain
@@ -0,0 +1,2 @@
+Project [array_position(e#0, 10) AS array_position(e, 10)#0L]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_remove.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_remove.explain
new file mode 100644
index 00000000000..c9aea402dc7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_remove.explain
@@ -0,0 +1,2 @@
+Project [array_remove(e#0, 314) AS array_remove(e, 314)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_repeat.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_repeat.explain
new file mode 100644
index 00000000000..f4417df8230
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_repeat.explain
@@ -0,0 +1,2 @@
+Project [array_repeat(a#0, 10) AS array_repeat(a, 10)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort.explain
new file mode 100644
index 00000000000..a8bb75836a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort.explain
@@ -0,0 +1,2 @@
+Project [array_sort(e#0, lambdafunction(if ((isnull(lambda left#0) AND isnull(lambda right#0))) 0 else if (isnull(lambda left#0)) 1 else if (isnull(lambda right#0)) -1 else if ((lambda left#0 < lambda right#0)) -1 else if ((lambda left#0 > lambda right#0)) 1 else 0, lambda left#0, lambda right#0, false), false) AS array_sort(e, lambdafunction((IF(((namedlambdavariable() IS NULL) AND (namedlambdavariable() IS NULL)), 0, (IF((namedlambdavariable() IS NULL), 1, (IF((namedlambdavariable() IS [...]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort_with_comparator.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort_with_comparator.explain
new file mode 100644
index 00000000000..cd86bcc5ffd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_sort_with_comparator.explain
@@ -0,0 +1,2 @@
+Project [array_sort(e#0, lambdafunction((lambda x#0 - lambda y#0), lambda x#0, lambda y#0, false), false) AS array_sort(e, lambdafunction((namedlambdavariable() - namedlambdavariable()), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_union.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_union.explain
new file mode 100644
index 00000000000..31e07099c3f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_array_union.explain
@@ -0,0 +1,2 @@
+Project [array_union(e#0, array(1, 2, 3)) AS array_union(e, array(1, 2, 3))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_overlap.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_overlap.explain
new file mode 100644
index 00000000000..0316f35ff9f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_overlap.explain
@@ -0,0 +1,2 @@
+Project [arrays_overlap(e#0, array(1, 2)) AS arrays_overlap(e, array(1, 2))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_zip.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_zip.explain
new file mode 100644
index 00000000000..0dc3f43b074
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_arrays_zip.explain
@@ -0,0 +1,2 @@
+Project [arrays_zip(e#0, sequence(cast(1 as bigint), cast(20 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)), e, 1) AS arrays_zip(e, sequence(1, 20, 1))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_concat.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_concat.explain
new file mode 100644
index 00000000000..4d765e5a9c3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_concat.explain
@@ -0,0 +1,2 @@
+Project [concat(cast(e#0 as array<bigint>), cast(array(1, 2) as array<bigint>), sequence(cast(33 as bigint), cast(40 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles))) AS concat(e, array(1, 2), sequence(33, 40, 1))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_element_at.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_element_at.explain
new file mode 100644
index 00000000000..45c17a4ccd5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_element_at.explain
@@ -0,0 +1,2 @@
+Project [element_at(f#0, bob, None, false) AS element_at(f, bob)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_exists.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_exists.explain
new file mode 100644
index 00000000000..1fab4ccb3a8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_exists.explain
@@ -0,0 +1,2 @@
+Project [exists(e#0, lambdafunction((lambda x#0 > 10), lambda x#0, false)) AS exists(e, lambdafunction((namedlambdavariable() > 10), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode.explain
new file mode 100644
index 00000000000..1f1792761f6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode.explain
@@ -0,0 +1,3 @@
+Project [col#0]
++- Generate explode(e#0), false, [col#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode_outer.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode_outer.explain
new file mode 100644
index 00000000000..3ee29e734dc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_explode_outer.explain
@@ -0,0 +1,3 @@
+Project [col#0]
++- Generate explode(e#0), true, [col#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter.explain
new file mode 100644
index 00000000000..a92b212666c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter.explain
@@ -0,0 +1,2 @@
+Project [filter(e#0, lambdafunction((lambda x#0 > 10), lambda x#0, false)) AS filter(e, lambdafunction((namedlambdavariable() > 10), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter_with_pair_input.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter_with_pair_input.explain
new file mode 100644
index 00000000000..63ab17bd1e5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_filter_with_pair_input.explain
@@ -0,0 +1,2 @@
+Project [filter(e#0, lambdafunction(((lambda x#0 > 10) AND (lambda y#0 > 2)), lambda x#0, lambda y#0, false)) AS filter(e, lambdafunction(((namedlambdavariable() > 10) AND (namedlambdavariable() > 2)), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_flatten.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_flatten.explain
new file mode 100644
index 00000000000..ebdb5617a86
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_flatten.explain
@@ -0,0 +1,2 @@
+Project [flatten(array(cast(e#0 as array<bigint>), sequence(cast(1 as bigint), cast(10 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)))) AS flatten(array(e, sequence(1, 10, 1)))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_forall.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_forall.explain
new file mode 100644
index 00000000000..e69389808a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_forall.explain
@@ -0,0 +1,2 @@
+Project [forall(e#0, lambdafunction((lambda x#0 > 10), lambda x#0, false)) AS forall(e, lambdafunction((namedlambdavariable() > 10), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_csv.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_csv.explain
new file mode 100644
index 00000000000..89e03c81882
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_csv.explain
@@ -0,0 +1,2 @@
+Project [from_csv(StructField(id,LongType,true), StructField(a,IntegerType,true), StructField(b,DoubleType,true), (mode,FAILFAST), g#0, Some(America/Los_Angeles), None) AS from_csv(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_json.explain
new file mode 100644
index 00000000000..1219f11d469
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_from_json.explain
@@ -0,0 +1,2 @@
+Project [from_json(StructField(id,LongType,true), StructField(a,IntegerType,true), StructField(b,DoubleType,true), g#0, Some(America/Los_Angeles)) AS from_json(g)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_get.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_get.explain
new file mode 100644
index 00000000000..5f3ef82b996
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_get.explain
@@ -0,0 +1,2 @@
+Project [e#0[2] AS get(e, 2)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_get_json_object.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_get_json_object.explain
new file mode 100644
index 00000000000..cfc3e05cd0a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_get_json_object.explain
@@ -0,0 +1,2 @@
+Project [get_json_object(g#0, $.device_type) AS get_json_object(g, $.device_type)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline.explain
new file mode 100644
index 00000000000..8b4c66ac607
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline.explain
@@ -0,0 +1,3 @@
+Project [id#0L, a#0, b#0]
++- Generate inline(map_values(f#0)), false, [id#0L, a#0, b#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline_outer.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline_outer.explain
new file mode 100644
index 00000000000..a94c28d0f2b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_inline_outer.explain
@@ -0,0 +1,3 @@
+Project [id#0L, a#0, b#0]
++- Generate inline(map_values(f#0)), true, [id#0L, a#0, b#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_tuple.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_tuple.explain
new file mode 100644
index 00000000000..5530a36a60b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_json_tuple.explain
@@ -0,0 +1,3 @@
+Project [c0#0, c1#0, c2#0]
++- Generate json_tuple(g#0, a, b, id), false, [c0#0, c1#0, c2#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_concat.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_concat.explain
new file mode 100644
index 00000000000..fb0e86e3485
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_concat.explain
@@ -0,0 +1,2 @@
+Project [map_concat(f#0, map(foo, struct(id, 12, a, 68, b, 2.718281828459045))) AS map_concat(f, map(foo, struct(12 AS id, 68 AS a, 2.718281828459045 AS b)))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_contains_key.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_contains_key.explain
new file mode 100644
index 00000000000..a2bc19114f4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_contains_key.explain
@@ -0,0 +1,2 @@
+Project [array_contains(map_keys(f#0), xyz) AS map_contains_key(f, xyz)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_entries.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_entries.explain
new file mode 100644
index 00000000000..2d9d550396c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_entries.explain
@@ -0,0 +1,2 @@
+Project [map_entries(f#0) AS map_entries(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_filter.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_filter.explain
new file mode 100644
index 00000000000..4e2502d0c98
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_filter.explain
@@ -0,0 +1,2 @@
+Project [map_filter(f#0, lambdafunction(Contains(lambda x#0, baz), lambda x#0, lambda y#0, false)) AS map_filter(f, lambdafunction(contains(namedlambdavariable(), baz), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_from_entries.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_from_entries.explain
new file mode 100644
index 00000000000..737900bef09
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_from_entries.explain
@@ -0,0 +1,2 @@
+Project [map_from_entries(transform(e#0, lambdafunction(struct(y, lambda y#0, x, lambda x#0), lambda x#0, lambda y#0, false))) AS map_from_entries(transform(e, lambdafunction(struct(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable())))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_keys.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_keys.explain
new file mode 100644
index 00000000000..85599d15954
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_keys.explain
@@ -0,0 +1,2 @@
+Project [map_keys(f#0) AS map_keys(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_values.explain
new file mode 100644
index 00000000000..0f93262af1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_values.explain
@@ -0,0 +1,2 @@
+Project [map_values(f#0) AS map_values(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_zip_with.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_zip_with.explain
new file mode 100644
index 00000000000..2c053fa6558
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_map_zip_with.explain
@@ -0,0 +1,2 @@
+Project [map_zip_with(f#0, f#0, lambdafunction((lambda y#0.id + lambda z#0.id), lambda x#0, lambda y#0, lambda z#0, false)) AS map_zip_with(f, f, lambdafunction((namedlambdavariable().id + namedlambdavariable().id), namedlambdavariable(), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode.explain
new file mode 100644
index 00000000000..39d0fd49866
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode.explain
@@ -0,0 +1,3 @@
+Project [pos#0, col#0]
++- Generate posexplode(e#0), false, [pos#0, col#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode_outer.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode_outer.explain
new file mode 100644
index 00000000000..c7023ef10b5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_posexplode_outer.explain
@@ -0,0 +1,3 @@
+Project [pos#0, col#0]
++- Generate posexplode(e#0), true, [pos#0, col#0]
+   +- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_reverse.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reverse.explain
new file mode 100644
index 00000000000..c659426e030
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_reverse.explain
@@ -0,0 +1,2 @@
+Project [reverse(e#0) AS reverse(e)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_csv.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_csv.explain
new file mode 100644
index 00000000000..ecd181a4292
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_csv.explain
@@ -0,0 +1,2 @@
+Project [schema_of_csv(1|abc, (sep,|)) AS schema_of_csv(1|abc)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json.explain
new file mode 100644
index 00000000000..8ec799bc580
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json.explain
@@ -0,0 +1,2 @@
+Project [schema_of_json([{"col":01}]) AS schema_of_json([{"col":01}])#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json_with_options.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json_with_options.explain
new file mode 100644
index 00000000000..13867949177
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_schema_of_json_with_options.explain
@@ -0,0 +1,2 @@
+Project [schema_of_json([{"col":01}], (allowNumericLeadingZeros,true)) AS schema_of_json([{"col":01}])#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_sequence.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sequence.explain
new file mode 100644
index 00000000000..2a71190c269
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sequence.explain
@@ -0,0 +1,2 @@
+Project [sequence(cast(1 as bigint), cast(10 as bigint), Some(cast(1 as bigint)), Some(America/Los_Angeles)) AS sequence(1, 10, 1)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_size.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_size.explain
new file mode 100644
index 00000000000..05ae4511bf8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_size.explain
@@ -0,0 +1,2 @@
+Project [size(f#0, true) AS size(f)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_slice.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_slice.explain
new file mode 100644
index 00000000000..96734d3b1f4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_slice.explain
@@ -0,0 +1,2 @@
+Project [slice(e#0, 0, 5) AS slice(e, 0, 5)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_sort_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sort_array.explain
new file mode 100644
index 00000000000..b9ab76a6d03
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_sort_array.explain
@@ -0,0 +1,2 @@
+Project [sort_array(e#0, true) AS sort_array(e, true)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_csv.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_csv.explain
new file mode 100644
index 00000000000..245ccb1dbff
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_csv.explain
@@ -0,0 +1,2 @@
+Project [to_csv((sep,|), d#0, Some(America/Los_Angeles)) AS to_csv(d)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_json.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_json.explain
new file mode 100644
index 00000000000..cd72b12ee19
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_to_json.explain
@@ -0,0 +1,2 @@
+Project [to_json((timestampFormat,dd/MM/yyyy), d#0, Some(America/Los_Angeles)) AS to_json(d)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform.explain
new file mode 100644
index 00000000000..1eb446551f1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform.explain
@@ -0,0 +1,2 @@
+Project [transform(e#0, lambdafunction((lambda x#0 + 1), lambda x#0, false)) AS transform(e, lambdafunction((namedlambdavariable() + 1), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_keys.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_keys.explain
new file mode 100644
index 00000000000..aae92957bcd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_keys.explain
@@ -0,0 +1,2 @@
+Project [transform_keys(f#0, lambdafunction(concat(lambda x#0, cast(lambda y#0.id as string)), lambda x#0, lambda y#0, false)) AS transform_keys(f, lambdafunction(concat(namedlambdavariable(), namedlambdavariable().id), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_values.explain
new file mode 100644
index 00000000000..3837ff0b78f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_values.explain
@@ -0,0 +1,2 @@
+Project [transform_values(f#0, lambdafunction(update_fields(lambda y#0, WithField(key, lambda x#0)), lambda x#0, lambda y#0, false)) AS transform_values(f, lambdafunction(update_fields(namedlambdavariable(), WithField(namedlambdavariable())), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_with_index.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_with_index.explain
new file mode 100644
index 00000000000..99c7733b1f7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_transform_with_index.explain
@@ -0,0 +1,2 @@
+Project [transform(e#0, lambdafunction((lambda x#0 + lambda y#0), lambda x#0, lambda y#0, false)) AS transform(e, lambdafunction((namedlambdavariable() + namedlambdavariable()), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_zip_with.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_zip_with.explain
new file mode 100644
index 00000000000..53c92983607
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_zip_with.explain
@@ -0,0 +1,2 @@
+Project [zip_with(e#0, e#0, lambdafunction((lambda x#0 + lambda y#0), lambda x#0, lambda y#0, false)) AS zip_with(e, e, lambdafunction((namedlambdavariable() + namedlambdavariable()), namedlambdavariable(), namedlambdavariable()))#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0, d#0, e#0, f#0, g#0]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.json b/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.json
new file mode 100644
index 00000000000..833350ad0b3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.json
@@ -0,0 +1,56 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "aggregate",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 0
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedNamedLambdaVariable": {
+                "nameParts": ["x"]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.proto.bin
new file mode 100644
index 00000000000..5fec870563f
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_aggregate.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.json
new file mode 100644
index 00000000000..7e2a7e300a2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_append",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 1
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.proto.bin
new file mode 100644
index 00000000000..9832323a527
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_append.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_append
+e
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.json
new file mode 100644
index 00000000000..4c81b0c8b38
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_compact",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.proto.bin
new file mode 100644
index 00000000000..28a30d579b2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_compact.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
array_compact
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.json
new file mode 100644
index 00000000000..6e43c2768db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_contains",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 1
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.proto.bin
new file mode 100644
index 00000000000..cdb20044cc9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_contains.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_contains
+e
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.json
new file mode 100644
index 00000000000..c512dd362c4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_distinct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.proto.bin
new file mode 100644
index 00000000000..5adda1b1f45
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_distinct.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_distinct
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.json
new file mode 100644
index 00000000000..f27cce41199
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.json
@@ -0,0 +1,36 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_except",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "integer": 2
+              }
+            }, {
+              "literal": {
+                "integer": 4
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.proto.bin
new file mode 100644
index 00000000000..a7a06cb7ec3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_except.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>42
+array_except
+e
+array
+0
+0
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.json
new file mode 100644
index 00000000000..6c6b9593393
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.json
@@ -0,0 +1,27 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_insert",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 0
+          }
+        }, {
+          "literal": {
+            "integer": 1
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.proto.bin
new file mode 100644
index 00000000000..c18114edbd0
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_array_insert.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.json
new file mode 100644
index 00000000000..5149f675e8c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_intersect",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "literal": {
+                "integer": 10
+              }
+            }, {
+              "literal": {
+                "integer": 4
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.proto.bin
new file mode 100644
index 00000000000..bab4ead039a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_intersect.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>1/
+array_intersect
+e
+array
+0
+
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.json
new file mode 100644
index 00000000000..8dcaad72daf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_join",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "string": ";"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.proto.bin
new file mode 100644
index 00000000000..db6dc1adff8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join.proto.bin
@@ -0,0 +1,6 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+array_join
+e
+j;
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.json
new file mode 100644
index 00000000000..bbad2109ff2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.json
@@ -0,0 +1,27 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_join",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "string": ";"
+          }
+        }, {
+          "literal": {
+            "string": "null"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.proto.bin
new file mode 100644
index 00000000000..0604d19d0c9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_join_with_null_replacement.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>&$
+
+array_join
+e
+j;
+jnull
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.json
new file mode 100644
index 00000000000..1270be928a9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_max",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.proto.bin
new file mode 100644
index 00000000000..f96db47323a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_max.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+	array_max
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.json
new file mode 100644
index 00000000000..e36f2275c4f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_min",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.proto.bin
new file mode 100644
index 00000000000..4b01ad39674
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_min.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+	array_min
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.json
new file mode 100644
index 00000000000..d13d31d467b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_position",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 10
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.proto.bin
new file mode 100644
index 00000000000..c26fbff6f06
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_position.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_position
+e
+0
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.json
new file mode 100644
index 00000000000..99fa5f3a1bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_remove",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 314
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.proto.bin
new file mode 100644
index 00000000000..68b37cdbfbe
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_remove.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_remove
+e
+0�
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.json
new file mode 100644
index 00000000000..b445ce38ee0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_repeat",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "literal": {
+            "integer": 10
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.proto.bin
new file mode 100644
index 00000000000..186b8027bea
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_repeat.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+array_repeat
+a
+0
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.json
new file mode 100644
index 00000000000..2e851777cbf
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_sort",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.proto.bin
new file mode 100644
index 00000000000..da38e74e5db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+array_sort
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.json
new file mode 100644
index 00000000000..df8c94c6257
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.json
@@ -0,0 +1,41 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_sort",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "-",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.proto.bin
new file mode 100644
index 00000000000..3217e0b59d1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_sort_with_comparator.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>86
+
+array_sort
+e!R
+
+-r
+xr
+y
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.json b/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.json
new file mode 100644
index 00000000000..f6e83f316c7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.json
@@ -0,0 +1,36 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "array_union",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "integer": 2
+              }
+            }, {
+              "literal": {
+                "integer": 3
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.proto.bin
new file mode 100644
index 00000000000..951d5681399
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_array_union.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>31
+array_union
+e
+array
+0
+0
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.json b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.json
new file mode 100644
index 00000000000..94c87c477db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "arrays_overlap",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "integer": 2
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.proto.bin
new file mode 100644
index 00000000000..771dd700f13
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_overlap.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>0.
+arrays_overlap
+e
+array
+0
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.json b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.json
new file mode 100644
index 00000000000..80d9c8c85c2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.json
@@ -0,0 +1,36 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "arrays_zip",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "sequence",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "integer": 20
+              }
+            }, {
+              "literal": {
+                "long": "1"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.proto.bin
new file mode 100644
index 00000000000..ea88aa58041
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_arrays_zip.proto.bin
@@ -0,0 +1,9 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>53
+
+arrays_zip
+e
+sequence
+0
+0
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_concat.json b/connector/connect/common/src/test/resources/query-tests/queries/function_concat.json
new file mode 100644
index 00000000000..c2517692977
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_concat.json
@@ -0,0 +1,49 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "concat",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "literal": {
+                "integer": 1
+              }
+            }, {
+              "literal": {
+                "integer": 2
+              }
+            }]
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "sequence",
+            "arguments": [{
+              "literal": {
+                "integer": 33
+              }
+            }, {
+              "literal": {
+                "integer": 40
+              }
+            }, {
+              "literal": {
+                "long": "1"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_concat.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_concat.proto.bin
new file mode 100644
index 00000000000..293ab94e506
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_concat.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>HF
+concat
+e
+array
+0
+0
+sequence
+0!
+0(
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.json b/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.json
new file mode 100644
index 00000000000..a037f4155e2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "element_at",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "literal": {
+            "string": "bob"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.proto.bin
new file mode 100644
index 00000000000..ceb7df30178
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_element_at.proto.bin
@@ -0,0 +1,6 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+element_at
+f
+jbob
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_exists.json b/connector/connect/common/src/test/resources/query-tests/queries/function_exists.json
new file mode 100644
index 00000000000..abf4410f76f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_exists.json
@@ -0,0 +1,39 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "exists",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "\u003e",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "literal": {
+                    "integer": 10
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_exists.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_exists.proto.bin
new file mode 100644
index 00000000000..edd91a6c341
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_exists.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>.,
+exists
+eR
+
+>r
+x
+0
+
+x
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_explode.json b/connector/connect/common/src/test/resources/query-tests/queries/function_explode.json
new file mode 100644
index 00000000000..4f8614806c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_explode.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "explode",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_explode.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_explode.proto.bin
new file mode 100644
index 00000000000..c95b243aba7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_explode.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+explode
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.json b/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.json
new file mode 100644
index 00000000000..3ac00c24995
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "explode_outer",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.proto.bin
new file mode 100644
index 00000000000..151dce8fff6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_explode_outer.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
explode_outer
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_filter.json b/connector/connect/common/src/test/resources/query-tests/queries/function_filter.json
new file mode 100644
index 00000000000..687f6c0bc23
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_filter.json
@@ -0,0 +1,39 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "filter",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "\u003e",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "literal": {
+                    "integer": 10
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_filter.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_filter.proto.bin
new file mode 100644
index 00000000000..85831fc0f71
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_filter.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>.,
+filter
+eR
+
+>r
+x
+0
+
+x
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.json b/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.json
new file mode 100644
index 00000000000..51ad29afbb6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.json
@@ -0,0 +1,59 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "filter",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "and",
+                "arguments": [{
+                  "unresolvedFunction": {
+                    "functionName": "\u003e",
+                    "arguments": [{
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["x"]
+                      }
+                    }, {
+                      "literal": {
+                        "integer": 10
+                      }
+                    }]
+                  }
+                }, {
+                  "unresolvedFunction": {
+                    "functionName": "\u003e",
+                    "arguments": [{
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["y"]
+                      }
+                    }, {
+                      "literal": {
+                        "integer": 2
+                      }
+                    }]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.proto.bin
new file mode 100644
index 00000000000..a378a54db7f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_filter_with_pair_input.proto.bin
@@ -0,0 +1,15 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>PN
+filter
+e=R;
+/-
+and
+>r
+x
+0
+
+>r
+y
+0
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.json b/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.json
new file mode 100644
index 00000000000..88b318c79ac
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.json
@@ -0,0 +1,41 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "flatten",
+        "arguments": [{
+          "unresolvedFunction": {
+            "functionName": "array",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "e"
+              }
+            }, {
+              "unresolvedFunction": {
+                "functionName": "sequence",
+                "arguments": [{
+                  "literal": {
+                    "integer": 1
+                  }
+                }, {
+                  "literal": {
+                    "integer": 10
+                  }
+                }, {
+                  "literal": {
+                    "long": "1"
+                  }
+                }]
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.proto.bin
new file mode 100644
index 00000000000..36f77dcb073
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_flatten.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>=;
+flatten0.
+array
+e
+sequence
+0
+0
+
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_forall.json b/connector/connect/common/src/test/resources/query-tests/queries/function_forall.json
new file mode 100644
index 00000000000..c63c5b5897a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_forall.json
@@ -0,0 +1,39 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "forall",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "\u003e",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "literal": {
+                    "integer": 10
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_forall.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_forall.proto.bin
new file mode 100644
index 00000000000..a515a03d995
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_forall.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>.,
+forall
+eR
+
+>r
+x
+0
+
+x
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.json b/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.json
new file mode 100644
index 00000000000..c23894d1bd1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.json
@@ -0,0 +1,36 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "from_csv",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "id BIGINT,a INT,b DOUBLE"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "mode"
+              }
+            }, {
+              "literal": {
+                "string": "FAILFAST"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.proto.bin
new file mode 100644
index 00000000000..8a01afa326f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_from_csv.proto.bin
@@ -0,0 +1,9 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>RP
+from_csv
+g
+jid BIGINT,a INT,b DOUBLE
+map
+jmode
+
+jFAILFAST
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.json b/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.json
new file mode 100644
index 00000000000..b86b8c12960
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "from_json",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"a\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.proto.bin
new file mode 100644
index 00000000000..23d781bbb92
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_from_json.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>��
+	from_json
+g�
+�j�{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}},{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"double","nullable":true,"metadata":{}}]}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_get.json b/connector/connect/common/src/test/resources/query-tests/queries/function_get.json
new file mode 100644
index 00000000000..0980e6fbdca
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_get.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "get",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 2
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_get.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_get.proto.bin
new file mode 100644
index 00000000000..11f56f6a85a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_get.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+get
+e
+0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.json b/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.json
new file mode 100644
index 00000000000..1e2771d53dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "get_json_object",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "$.device_type"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.proto.bin
new file mode 100644
index 00000000000..b3dc06c1d8f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_get_json_object.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>-+
+get_json_object
+g
+j
$.device_type
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_inline.json b/connector/connect/common/src/test/resources/query-tests/queries/function_inline.json
new file mode 100644
index 00000000000..1e55f7680a2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_inline.json
@@ -0,0 +1,24 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "inline",
+        "arguments": [{
+          "unresolvedFunction": {
+            "functionName": "map_values",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "f"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_inline.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_inline.proto.bin
new file mode 100644
index 00000000000..a0210b7cfc1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_inline.proto.bin
@@ -0,0 +1,6 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>!
+inline
+
+map_values
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.json b/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.json
new file mode 100644
index 00000000000..a89cbd96e21
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.json
@@ -0,0 +1,24 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "inline_outer",
+        "arguments": [{
+          "unresolvedFunction": {
+            "functionName": "map_values",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "f"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.proto.bin
new file mode 100644
index 00000000000..cc316078c3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_inline_outer.proto.bin
@@ -0,0 +1,6 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>'%
+inline_outer
+
+map_values
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.json b/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.json
new file mode 100644
index 00000000000..bcded73e4f9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.json
@@ -0,0 +1,31 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "json_tuple",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "g"
+          }
+        }, {
+          "literal": {
+            "string": "a"
+          }
+        }, {
+          "literal": {
+            "string": "b"
+          }
+        }, {
+          "literal": {
+            "string": "id"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.proto.bin
new file mode 100644
index 00000000000..9b908290bbd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_json_tuple.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>+)
+
+json_tuple
+g
+ja
+jb
+jid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.json
new file mode 100644
index 00000000000..2a1369806c5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.json
@@ -0,0 +1,60 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_concat",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "foo"
+              }
+            }, {
+              "unresolvedFunction": {
+                "functionName": "struct",
+                "arguments": [{
+                  "alias": {
+                    "expr": {
+                      "literal": {
+                        "long": "12"
+                      }
+                    },
+                    "name": ["id"]
+                  }
+                }, {
+                  "alias": {
+                    "expr": {
+                      "literal": {
+                        "integer": 68
+                      }
+                    },
+                    "name": ["a"]
+                  }
+                }, {
+                  "alias": {
+                    "expr": {
+                      "literal": {
+                        "double": 2.718281828459045
+                      }
+                    },
+                    "name": ["b"]
+                  }
+                }]
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.proto.bin
new file mode 100644
index 00000000000..2b03aedd396
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_concat.proto.bin
@@ -0,0 +1,16 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>b`
+
+map_concat
+fKI
+map
+jfoo97
+struct2
+
+
+8id2	
+
+0Da2
+
+	YiW�
+�@b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.json
new file mode 100644
index 00000000000..b7f9c2b1b6b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_contains_key",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "literal": {
+            "string": "xyz"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.proto.bin
new file mode 100644
index 00000000000..6ae21121eb7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_contains_key.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>$"
+map_contains_key
+f
+jxyz
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.json
new file mode 100644
index 00000000000..2cb5a2af142
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_entries",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.proto.bin
new file mode 100644
index 00000000000..f8e5894ffe5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_entries.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+map_entries
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.json
new file mode 100644
index 00000000000..d49f546bedc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.json
@@ -0,0 +1,41 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_filter",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "contains",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "literal": {
+                    "string": "baz"
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.proto.bin
new file mode 100644
index 00000000000..cff017d65eb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_filter.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>A?
+
+map_filter
+f*R(
+
+containsr
+x
+jbaz
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.json
new file mode 100644
index 00000000000..0b4983881f0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.json
@@ -0,0 +1,46 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_from_entries",
+        "arguments": [{
+          "unresolvedFunction": {
+            "functionName": "transform",
+            "arguments": [{
+              "unresolvedAttribute": {
+                "unparsedIdentifier": "e"
+              }
+            }, {
+              "lambdaFunction": {
+                "function": {
+                  "unresolvedFunction": {
+                    "functionName": "struct",
+                    "arguments": [{
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["y"]
+                      }
+                    }, {
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["x"]
+                      }
+                    }]
+                  }
+                },
+                "arguments": [{
+                  "nameParts": ["x"]
+                }, {
+                  "nameParts": ["y"]
+                }]
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.proto.bin
new file mode 100644
index 00000000000..6ef02d02827
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_from_entries.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>RP
+map_from_entries<:
+	transform
+e&R$
+
+structr
+yr
+x
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.json
new file mode 100644
index 00000000000..3719473dfe7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_keys",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.proto.bin
new file mode 100644
index 00000000000..ed11768367a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_keys.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+map_keys
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.json
new file mode 100644
index 00000000000..b935f371cc4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_values",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.proto.bin
new file mode 100644
index 00000000000..df3bf18c8f2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_values.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+map_values
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.json b/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.json
new file mode 100644
index 00000000000..e9206324ed9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.json
@@ -0,0 +1,65 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "map_zip_with",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedExtractValue": {
+                    "child": {
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["y"]
+                      }
+                    },
+                    "extraction": {
+                      "literal": {
+                        "string": "id"
+                      }
+                    }
+                  }
+                }, {
+                  "unresolvedExtractValue": {
+                    "child": {
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["z"]
+                      }
+                    },
+                    "extraction": {
+                      "literal": {
+                        "string": "id"
+                      }
+                    }
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }, {
+              "nameParts": ["z"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.proto.bin
new file mode 100644
index 00000000000..28ed3380aeb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_map_zip_with.proto.bin
@@ -0,0 +1,16 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>^\
+map_zip_with
+f
+f>R<
++)
++b
+r
+y
+jidb
+r
+z
+jid
+x
+y
+z
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.json b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.json
new file mode 100644
index 00000000000..9e6b94da8f7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "posexplode",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.proto.bin
new file mode 100644
index 00000000000..a4676028ae4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode.proto.bin
@@ -0,0 +1,5 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+posexplode
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.json b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.json
new file mode 100644
index 00000000000..a272b1cfc6e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "posexplode_outer",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.proto.bin
new file mode 100644
index 00000000000..f7f093a0bf4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_posexplode_outer.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+posexplode_outer
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.json b/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.json
new file mode 100644
index 00000000000..efb54e0fff8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "reverse",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.proto.bin
new file mode 100644
index 00000000000..43bfdf13e3f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_reverse.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+reverse
+e
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.json b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.json
new file mode 100644
index 00000000000..4548b63ee43
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "schema_of_csv",
+        "arguments": [{
+          "literal": {
+            "string": "1|abc"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "sep"
+              }
+            }, {
+              "literal": {
+                "string": "|"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.proto.bin
new file mode 100644
index 00000000000..130f29db338
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_csv.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>53
+
schema_of_csv	
+j1|abc
+map
+jsep
+j|
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.json b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.json
new file mode 100644
index 00000000000..e2b8452a5a9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "schema_of_json",
+        "arguments": [{
+          "literal": {
+            "string": "[{\"col\":01}]"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.proto.bin
new file mode 100644
index 00000000000..85260e2e944
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>$"
+schema_of_json
+j[{"col":01}]
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.json b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.json
new file mode 100644
index 00000000000..e4017e52169
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "schema_of_json",
+        "arguments": [{
+          "literal": {
+            "string": "[{\"col\":01}]"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "allowNumericLeadingZeros"
+              }
+            }, {
+              "literal": {
+                "string": "true"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.proto.bin
new file mode 100644
index 00000000000..dd8cc70aec6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_schema_of_json_with_options.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>US
+schema_of_json
+j[{"col":01}]/-
+map
+jallowNumericLeadingZeros
+jtrue
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.json b/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.json
new file mode 100644
index 00000000000..646c0d974e5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.json
@@ -0,0 +1,27 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "sequence",
+        "arguments": [{
+          "literal": {
+            "integer": 1
+          }
+        }, {
+          "literal": {
+            "integer": 10
+          }
+        }, {
+          "literal": {
+            "long": "1"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.proto.bin
new file mode 100644
index 00000000000..79a44c8cff8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_sequence.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+sequence
+0
+0
+
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_size.json b/connector/connect/common/src/test/resources/query-tests/queries/function_size.json
new file mode 100644
index 00000000000..e75665d8144
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_size.json
@@ -0,0 +1,19 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "size",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_size.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_size.proto.bin
new file mode 100644
index 00000000000..a0dbe3709cb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_size.proto.bin
@@ -0,0 +1,4 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+size
+f
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_slice.json b/connector/connect/common/src/test/resources/query-tests/queries/function_slice.json
new file mode 100644
index 00000000000..497a1626eef
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_slice.json
@@ -0,0 +1,27 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "slice",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "integer": 0
+          }
+        }, {
+          "literal": {
+            "integer": 5
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_slice.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_slice.proto.bin
new file mode 100644
index 00000000000..3e1ff1ec8c8
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_slice.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.json b/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.json
new file mode 100644
index 00000000000..e8b279c0a93
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.json
@@ -0,0 +1,23 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "sort_array",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "literal": {
+            "boolean": true
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.proto.bin
new file mode 100644
index 00000000000..9fe1405d5d2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_sort_array.proto.bin
@@ -0,0 +1,6 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>
+
+sort_array
+e
+
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.json
new file mode 100644
index 00000000000..e06e9f57e1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "to_csv",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "d"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "sep"
+              }
+            }, {
+              "literal": {
+                "string": "|"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.proto.bin
new file mode 100644
index 00000000000..fb670fcccee
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_csv.proto.bin
@@ -0,0 +1,7 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>*(
+to_csv
+d
+map
+jsep
+j|
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.json b/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.json
new file mode 100644
index 00000000000..e58801c815e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.json
@@ -0,0 +1,32 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "to_json",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "d"
+          }
+        }, {
+          "unresolvedFunction": {
+            "functionName": "map",
+            "arguments": [{
+              "literal": {
+                "string": "timestampFormat"
+              }
+            }, {
+              "literal": {
+                "string": "dd/MM/yyyy"
+              }
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.proto.bin
new file mode 100644
index 00000000000..6cb8d1290aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_to_json.proto.bin
@@ -0,0 +1,8 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>@>
+to_json
+d,*
+map
+jtimestampFormat
+j
+dd/MM/yyyy
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform.json b/connector/connect/common/src/test/resources/query-tests/queries/function_transform.json
new file mode 100644
index 00000000000..bc8749cc022
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform.json
@@ -0,0 +1,39 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "transform",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "literal": {
+                    "integer": 1
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_transform.proto.bin
new file mode 100644
index 00000000000..73e56968131
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform.proto.bin
@@ -0,0 +1,9 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>1/
+	transform
+eR
+
++r
+x
+0
+x
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.json b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.json
new file mode 100644
index 00000000000..400c2b0cc86
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.json
@@ -0,0 +1,50 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "transform_keys",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "concat",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedExtractValue": {
+                    "child": {
+                      "unresolvedNamedLambdaVariable": {
+                        "nameParts": ["y"]
+                      }
+                    },
+                    "extraction": {
+                      "literal": {
+                        "string": "id"
+                      }
+                    }
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.proto.bin
new file mode 100644
index 00000000000..97533b22067
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_keys.proto.bin
@@ -0,0 +1,12 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>MK
+transform_keys
+f2R0
+$"
+concatr
+xb
+r
+y
+jid
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.json b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.json
new file mode 100644
index 00000000000..97a80e48457
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.json
@@ -0,0 +1,42 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "transform_values",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "f"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "updateFields": {
+                "structExpression": {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                },
+                "fieldName": "key",
+                "valueExpression": {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.proto.bin
new file mode 100644
index 00000000000..7118b0b15a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_values.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>@>
+transform_values
+f#R!
+j
+r
+ykeyr
+x
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.json b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.json
new file mode 100644
index 00000000000..7f538460838
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.json
@@ -0,0 +1,41 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "transform",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.proto.bin
new file mode 100644
index 00000000000..41c1878fbf6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_transform_with_index.proto.bin
@@ -0,0 +1,10 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>75
+	transform
+e!R
+
++r
+xr
+y
+x
+y
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.json b/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.json
new file mode 100644
index 00000000000..a7c603ed6bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.json
@@ -0,0 +1,45 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double,d:struct\u003cid:bigint,a:int,b:double\u003e,e:array\u003cint\u003e,f:map\u003cstring,struct\u003cid:bigint,a:int,b:double\u003e\u003e,g:string\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "zip_with",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "e"
+          }
+        }, {
+          "lambdaFunction": {
+            "function": {
+              "unresolvedFunction": {
+                "functionName": "+",
+                "arguments": [{
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["x"]
+                  }
+                }, {
+                  "unresolvedNamedLambdaVariable": {
+                    "nameParts": ["y"]
+                  }
+                }]
+              }
+            },
+            "arguments": [{
+              "nameParts": ["x"]
+            }, {
+              "nameParts": ["y"]
+            }]
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.proto.bin
new file mode 100644
index 00000000000..2d6bec136a7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_zip_with.proto.bin
@@ -0,0 +1,11 @@
+�
+�Z��struct<id:bigint,a:int,b:double,d:struct<id:bigint,a:int,b:double>,e:array<int>,f:map<string,struct<id:bigint,a:int,b:double>>,g:string>=;
+zip_with
+e
+e!R
+
++r
+xr
+y
+x
+y
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org