You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/15 14:54:34 UTC

[spark] branch branch-3.4 updated: [SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new a46278f98b3 [SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client
a46278f98b3 is described below

commit a46278f98b3500dd0ec22d4580f954c8c8a1ddb3
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Wed Feb 15 10:54:06 2023 -0400

    [SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client
    
    ### What changes were proposed in this pull request?
    Add a lot of the existing Dataframe APIs to the Spark Connect Scala Client.
    
    This PR does not contain:
    - Typed APIs
    - Aggregation
    - Streaming (not supported by connect just yet)
    - NA/Stats functions
    - TempView registration.
    
    ### Why are the changes needed?
    We want the Scala Client Dataset to reach parity with the existing Dataset.
    
    ### How was this patch tested?
    Added a lot of golden tests.
    
    Added a number of test cases to the E2E suite for the functionality that requires server interaction.
    
    Closes #40019 from hvanhovell/SPARK-42440.
    
    Authored-by: Herman van Hovell <he...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 9843c7c9a259250f94678e0502fa21f58c80d27c)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/Column.scala   |  117 ++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 2201 +++++++++++++++++++-
 .../scala/org/apache/spark/sql/SparkSession.scala  |   16 +-
 .../sql/connect/client/SparkConnectClient.scala    |    3 +-
 .../scala/org/apache/spark/sql/functions.scala     |   14 +
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  184 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |    4 +
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  325 ++-
 .../org/apache/spark/sql/SparkSessionSuite.scala   |    2 +-
 .../sql/connect/client/CompatibilitySuite.scala    |    4 +-
 .../connect/common}/DataTypeProtoConverter.scala   |   29 +-
 .../sql/connect/common/InvalidPlanInput.scala      |   25 +
 .../explain-results/alias_string.explain           |    1 +
 .../explain-results/alias_symbol.explain           |    1 +
 .../query-tests/explain-results/apply.explain      |    2 +
 .../query-tests/explain-results/as_string.explain  |    1 +
 .../query-tests/explain-results/as_symbol.explain  |    1 +
 .../query-tests/explain-results/coalesce.explain   |    2 +
 .../query-tests/explain-results/col.explain        |    2 +
 .../query-tests/explain-results/colRegex.explain   |    2 +
 .../query-tests/explain-results/crossJoin.explain  |    3 +
 .../query-tests/explain-results/describe.explain   |    6 +
 .../query-tests/explain-results/distinct.explain   |    2 +
 .../explain-results/dropDuplicates.explain         |    2 +
 .../dropDuplicates_names_array.explain             |    2 +
 .../dropDuplicates_names_seq.explain               |    2 +
 .../explain-results/dropDuplicates_varargs.explain |    2 +
 .../explain-results/drop_multiple_column.explain   |    2 +
 .../explain-results/drop_multiple_strings.explain  |    2 +
 .../explain-results/drop_single_column.explain     |    2 +
 .../explain-results/drop_single_string.explain     |    2 +
 .../query-tests/explain-results/except.explain     |    3 +
 .../query-tests/explain-results/exceptAll.explain  |    3 +
 .../explain-results/filter_expr.explain            |    2 +
 .../explain-results/function_udf.explain           |    2 +
 .../query-tests/explain-results/hint.explain       |    2 +
 .../query-tests/explain-results/intersect.explain  |    3 +
 .../explain-results/intersectAll.explain           |    3 +
 .../explain-results/join_condition.explain         |    3 +
 .../explain-results/join_inner_condition.explain   |    3 +
 .../join_inner_no_condition.explain                |    3 +
 .../join_inner_using_multiple_col_array.explain    |    3 +
 .../join_inner_using_multiple_col_seq.explain      |    3 +
 .../join_inner_using_single_col.explain            |    3 +
 .../join_using_multiple_col_array.explain          |    3 +
 .../join_using_multiple_col_seq.explain            |    3 +
 .../explain-results/join_using_single_col.explain  |    3 +
 .../explain-results/melt_no_values.explain         |    2 +
 .../explain-results/melt_values.explain            |    2 +
 .../query-tests/explain-results/offset.explain     |    2 +
 .../explain-results/orderBy_columns.explain        |    2 +
 .../explain-results/orderBy_strings.explain        |    2 +
 .../explain-results/repartition.explain            |    2 +
 .../repartitionByRange_expressions.explain         |    2 +
 ...itionByRange_num_partitions_expressions.explain |    2 +
 .../repartition_expressions.explain                |    2 +
 .../repartition_num_partitions_expressions.explain |    2 +
 .../explain-results/sample_fraction_seed.explain   |    2 +
 .../sample_withReplacement_fraction_seed.explain   |    2 +
 .../query-tests/explain-results/selectExpr.explain |    2 +
 .../explain-results/select_strings.explain         |    2 +
 .../sortWithinPartitions_columns.explain           |    2 +
 .../sortWithinPartitions_strings.explain           |    2 +
 .../explain-results/sort_columns.explain           |    2 +
 .../explain-results/sort_strings.explain           |    2 +
 .../query-tests/explain-results/summary.explain    |    5 +
 .../query-tests/explain-results/to.explain         |    2 +
 .../query-tests/explain-results/toDF.explain       |    2 +
 .../query-tests/explain-results/union.explain      |    3 +
 .../query-tests/explain-results/unionAll.explain   |    3 +
 .../explain-results/unionByName.explain            |    3 +
 .../unionByName_allowMissingColumns.explain        |    3 +
 .../explain-results/unpivot_no_values.explain      |    2 +
 .../explain-results/unpivot_values.explain         |    2 +
 .../explain-results/where_column.explain           |    2 +
 .../query-tests/explain-results/where_expr.explain |    2 +
 .../withColumnRenamed_java_map.explain             |    2 +
 .../withColumnRenamed_scala_map.explain            |    2 +
 .../withColumnRenamed_single.explain               |    2 +
 .../explain-results/withColumn_single.explain      |    2 +
 .../explain-results/withColumns_java_map.explain   |    2 +
 .../explain-results/withColumns_scala_map.explain  |    2 +
 .../explain-results/withMetadata.explain           |    2 +
 .../query-tests/queries/alias_string.json          |   10 +
 .../query-tests/queries/alias_string.proto.bin     |    2 +
 .../query-tests/queries/alias_symbol.json          |   10 +
 .../query-tests/queries/alias_symbol.proto.bin     |    2 +
 .../test/resources/query-tests/queries/apply.json  |   14 +
 .../resources/query-tests/queries/apply.proto.bin  |    3 +
 .../resources/query-tests/queries/as_string.json   |   10 +
 .../query-tests/queries/as_string.proto.bin        |    2 +
 .../resources/query-tests/queries/as_symbol.json   |   10 +
 .../query-tests/queries/as_symbol.proto.bin        |    2 +
 .../resources/query-tests/queries/coalesce.json    |   11 +
 .../query-tests/queries/coalesce.proto.bin         |  Bin 0 -> 45 bytes
 .../test/resources/query-tests/queries/col.json    |   18 +
 .../resources/query-tests/queries/col.proto.bin    |    4 +
 .../resources/query-tests/queries/colRegex.json    |   14 +
 .../query-tests/queries/colRegex.proto.bin         |    3 +
 .../resources/query-tests/queries/crossJoin.json   |   15 +
 .../query-tests/queries/crossJoin.proto.bin        |    2 +
 .../resources/query-tests/queries/describe.json    |   10 +
 .../query-tests/queries/describe.proto.bin         |    2 +
 .../resources/query-tests/queries/distinct.json    |   10 +
 .../query-tests/queries/distinct.proto.bin         |    2 +
 .../query-tests/queries/dropDuplicates.json        |   10 +
 .../query-tests/queries/dropDuplicates.proto.bin   |    2 +
 .../queries/dropDuplicates_names_array.json        |   10 +
 .../queries/dropDuplicates_names_array.proto.bin   |    2 +
 .../queries/dropDuplicates_names_seq.json          |   10 +
 .../queries/dropDuplicates_names_seq.proto.bin     |    2 +
 .../queries/dropDuplicates_varargs.json            |   10 +
 .../queries/dropDuplicates_varargs.proto.bin       |    2 +
 .../query-tests/queries/drop_multiple_column.json  |   18 +
 .../queries/drop_multiple_column.proto.bin         |    4 +
 .../query-tests/queries/drop_multiple_strings.json |   22 +
 .../queries/drop_multiple_strings.proto.bin        |    5 +
 .../query-tests/queries/drop_single_column.json    |   14 +
 .../queries/drop_single_column.proto.bin           |    3 +
 .../query-tests/queries/drop_single_string.json    |   14 +
 .../queries/drop_single_string.proto.bin           |    3 +
 .../test/resources/query-tests/queries/except.json |   16 +
 .../resources/query-tests/queries/except.proto.bin |  Bin 0 -> 82 bytes
 .../resources/query-tests/queries/exceptAll.json   |   16 +
 .../query-tests/queries/exceptAll.proto.bin        |    2 +
 .../resources/query-tests/queries/filter_expr.json |   14 +
 .../query-tests/queries/filter_expr.proto.bin      |    3 +
 .../query-tests/queries/function_udf.json          |   96 +
 .../query-tests/queries/function_udf.proto.bin     |  Bin 0 -> 11257 bytes
 .../test/resources/query-tests/queries/hint.json   |   15 +
 .../resources/query-tests/queries/hint.proto.bin   |    3 +
 .../resources/query-tests/queries/intersect.json   |   16 +
 .../query-tests/queries/intersect.proto.bin        |  Bin 0 -> 82 bytes
 .../query-tests/queries/intersectAll.json          |   16 +
 .../query-tests/queries/intersectAll.proto.bin     |    2 +
 .../query-tests/queries/join_condition.json        |   29 +
 .../query-tests/queries/join_condition.proto.bin   |    5 +
 .../query-tests/queries/join_inner_condition.json  |   29 +
 .../queries/join_inner_condition.proto.bin         |    5 +
 .../queries/join_inner_no_condition.json           |   15 +
 .../queries/join_inner_no_condition.proto.bin      |    2 +
 .../join_inner_using_multiple_col_array.json       |   16 +
 .../join_inner_using_multiple_col_array.proto.bin  |    2 +
 .../queries/join_inner_using_multiple_col_seq.json |   16 +
 .../join_inner_using_multiple_col_seq.proto.bin    |    2 +
 .../queries/join_inner_using_single_col.json       |   16 +
 .../queries/join_inner_using_single_col.proto.bin  |    2 +
 .../queries/join_using_multiple_col_array.json     |   16 +
 .../join_using_multiple_col_array.proto.bin        |    2 +
 .../queries/join_using_multiple_col_seq.json       |   16 +
 .../queries/join_using_multiple_col_seq.proto.bin  |    2 +
 .../query-tests/queries/join_using_single_col.json |   16 +
 .../queries/join_using_single_col.proto.bin        |    2 +
 .../query-tests/queries/melt_no_values.json        |   19 +
 .../query-tests/queries/melt_no_values.proto.bin   |    4 +
 .../resources/query-tests/queries/melt_values.json |   22 +
 .../query-tests/queries/melt_values.proto.bin      |    5 +
 .../test/resources/query-tests/queries/offset.json |   10 +
 .../resources/query-tests/queries/offset.proto.bin |    2 +
 .../query-tests/queries/orderBy_columns.json       |   35 +
 .../query-tests/queries/orderBy_columns.proto.bin  |  Bin 0 -> 82 bytes
 .../query-tests/queries/orderBy_strings.json       |   35 +
 .../query-tests/queries/orderBy_strings.proto.bin  |  Bin 0 -> 82 bytes
 .../resources/query-tests/queries/repartition.json |   11 +
 .../query-tests/queries/repartition.proto.bin      |    2 +
 .../queries/repartitionByRange_expressions.json    |   30 +
 .../repartitionByRange_expressions.proto.bin       |    6 +
 ...artitionByRange_num_partitions_expressions.json |   31 +
 ...ionByRange_num_partitions_expressions.proto.bin |    6 +
 .../queries/repartition_expressions.json           |   18 +
 .../queries/repartition_expressions.proto.bin      |    4 +
 .../repartition_num_partitions_expressions.json    |   19 +
 ...epartition_num_partitions_expressions.proto.bin |    4 +
 .../query-tests/queries/sample_fraction_seed.json  |   12 +
 .../queries/sample_fraction_seed.proto.bin         |  Bin 0 -> 56 bytes
 .../sample_withReplacement_fraction_seed.json      |   12 +
 .../sample_withReplacement_fraction_seed.proto.bin |    3 +
 .../resources/query-tests/queries/selectExpr.json  |   18 +
 .../query-tests/queries/selectExpr.proto.bin       |    4 +
 .../query-tests/queries/select_strings.json        |   18 +
 .../query-tests/queries/select_strings.proto.bin   |    4 +
 .../queries/sortWithinPartitions_columns.json      |   27 +
 .../queries/sortWithinPartitions_columns.proto.bin |  Bin 0 -> 69 bytes
 .../queries/sortWithinPartitions_strings.json      |   27 +
 .../queries/sortWithinPartitions_strings.proto.bin |  Bin 0 -> 69 bytes
 .../query-tests/queries/sort_columns.json          |   27 +
 .../query-tests/queries/sort_columns.proto.bin     |  Bin 0 -> 69 bytes
 .../query-tests/queries/sort_strings.json          |   27 +
 .../query-tests/queries/sort_strings.proto.bin     |  Bin 0 -> 68 bytes
 .../resources/query-tests/queries/summary.json     |   10 +
 .../query-tests/queries/summary.proto.bin          |    2 +
 .../src/test/resources/query-tests/queries/to.json |   28 +
 .../resources/query-tests/queries/to.proto.bin     |  Bin 0 -> 69 bytes
 .../test/resources/query-tests/queries/toDF.json   |   10 +
 .../resources/query-tests/queries/toDF.proto.bin   |    2 +
 .../test/resources/query-tests/queries/union.json  |   16 +
 .../resources/query-tests/queries/union.proto.bin  |    2 +
 .../resources/query-tests/queries/unionAll.json    |   16 +
 .../query-tests/queries/unionAll.proto.bin         |    2 +
 .../resources/query-tests/queries/unionByName.json |   18 +
 .../query-tests/queries/unionByName.proto.bin      |  Bin 0 -> 92 bytes
 .../queries/unionByName_allowMissingColumns.json   |   18 +
 .../unionByName_allowMissingColumns.proto.bin      |    2 +
 .../query-tests/queries/unpivot_no_values.json     |   15 +
 .../queries/unpivot_no_values.proto.bin            |    3 +
 .../query-tests/queries/unpivot_values.json        |   26 +
 .../query-tests/queries/unpivot_values.proto.bin   |    6 +
 .../query-tests/queries/where_column.json          |   23 +
 .../query-tests/queries/where_column.proto.bin     |    5 +
 .../resources/query-tests/queries/where_expr.json  |   14 +
 .../query-tests/queries/where_expr.proto.bin       |    3 +
 .../queries/withColumnRenamed_java_map.json        |   13 +
 .../queries/withColumnRenamed_java_map.proto.bin   |    5 +
 .../queries/withColumnRenamed_scala_map.json       |   13 +
 .../queries/withColumnRenamed_scala_map.proto.bin  |    5 +
 .../queries/withColumnRenamed_single.json          |   12 +
 .../queries/withColumnRenamed_single.proto.bin     |    3 +
 .../query-tests/queries/withColumn_single.json     |   17 +
 .../queries/withColumn_single.proto.bin            |    4 +
 .../query-tests/queries/withColumns_java_map.json  |   24 +
 .../queries/withColumns_java_map.proto.bin         |    6 +
 .../query-tests/queries/withColumns_scala_map.json |   24 +
 .../queries/withColumns_scala_map.proto.bin        |    7 +
 .../query-tests/queries/withMetadata.json          |   18 +
 .../query-tests/queries/withMetadata.proto.bin     |    4 +
 .../org/apache/spark/sql/connect/dsl/package.scala |    5 +-
 .../planner/LiteralValueProtoConverter.scala       |    1 +
 .../sql/connect/planner/SaveModeConverter.scala    |   49 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |    9 +-
 .../sql/connect/service/SparkConnectService.scala  |    3 +-
 .../connect/planner/SparkConnectPlannerSuite.scala |    1 +
 .../connect/planner/SparkConnectProtoSuite.scala   |    7 +-
 .../plugin/SparkConnectPluginRegistrySuite.scala   |    3 +-
 233 files changed, 4577 insertions(+), 85 deletions(-)

diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index 1154b1e516c..1634c2d5b05 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 import scala.collection.JavaConverters._
 
 import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.Expression.SortOrder.NullOrdering
+import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Column.fn
 import org.apache.spark.sql.connect.client.unsupported
@@ -95,6 +97,121 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg
   def name(alias: String): Column = Column { builder =>
     builder.getAliasBuilder.addName(alias).setExpr(expr)
   }
+
+  /**
+   * Returns a sort expression based on the descending order of the column.
+   * {{{
+   *   // Scala
+   *   df.sort(df("age").desc)
+   *
+   *   // Java
+   *   df.sort(df.col("age").desc());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 1.3.0
+   */
+  def desc: Column = desc_nulls_last
+
+  /**
+   * Returns a sort expression based on the descending order of the column, and null values appear
+   * before non-null values.
+   * {{{
+   *   // Scala: sort a DataFrame by age column in descending order and null values appearing first.
+   *   df.sort(df("age").desc_nulls_first)
+   *
+   *   // Java
+   *   df.sort(df.col("age").desc_nulls_first());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 2.1.0
+   */
+  def desc_nulls_first: Column =
+    buildSortOrder(SortDirection.SORT_DIRECTION_DESCENDING, NullOrdering.SORT_NULLS_FIRST)
+
+  /**
+   * Returns a sort expression based on the descending order of the column, and null values appear
+   * after non-null values.
+   * {{{
+   *   // Scala: sort a DataFrame by age column in descending order and null values appearing last.
+   *   df.sort(df("age").desc_nulls_last)
+   *
+   *   // Java
+   *   df.sort(df.col("age").desc_nulls_last());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 2.1.0
+   */
+  def desc_nulls_last: Column =
+    buildSortOrder(SortDirection.SORT_DIRECTION_DESCENDING, NullOrdering.SORT_NULLS_LAST)
+
+  /**
+   * Returns a sort expression based on ascending order of the column.
+   * {{{
+   *   // Scala: sort a DataFrame by age column in ascending order.
+   *   df.sort(df("age").asc)
+   *
+   *   // Java
+   *   df.sort(df.col("age").asc());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 1.3.0
+   */
+  def asc: Column = asc_nulls_first
+
+  /**
+   * Returns a sort expression based on ascending order of the column, and null values return
+   * before non-null values.
+   * {{{
+   *   // Scala: sort a DataFrame by age column in ascending order and null values appearing first.
+   *   df.sort(df("age").asc_nulls_first)
+   *
+   *   // Java
+   *   df.sort(df.col("age").asc_nulls_first());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 2.1.0
+   */
+  def asc_nulls_first: Column =
+    buildSortOrder(SortDirection.SORT_DIRECTION_ASCENDING, NullOrdering.SORT_NULLS_FIRST)
+
+  /**
+   * Returns a sort expression based on ascending order of the column, and null values appear
+   * after non-null values.
+   * {{{
+   *   // Scala: sort a DataFrame by age column in ascending order and null values appearing last.
+   *   df.sort(df("age").asc_nulls_last)
+   *
+   *   // Java
+   *   df.sort(df.col("age").asc_nulls_last());
+   * }}}
+   *
+   * @group expr_ops
+   * @since 2.1.0
+   */
+  def asc_nulls_last: Column =
+    buildSortOrder(SortDirection.SORT_DIRECTION_ASCENDING, NullOrdering.SORT_NULLS_LAST)
+
+  private def buildSortOrder(sortDirection: SortDirection, nullOrdering: NullOrdering): Column =
+    Column { builder =>
+      builder.getSortOrderBuilder
+        .setChild(expr)
+        .setDirection(sortDirection)
+        .setNullOrdering(nullOrdering)
+    }
+
+  private[sql] def sortOrder: proto.Expression.SortOrder = {
+    val base = if (expr.hasSortOrder) {
+      expr
+    } else {
+      asc.expr
+    }
+    base.getSortOrder
+  }
 }
 
 private[sql] object Column {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 51b734d1daa..fd0c9320ab4 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -16,60 +16,2215 @@
  */
 package org.apache.spark.sql
 
+import java.util.{Collections, Locale}
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
 
 import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.expressions.RowOrdering
 import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.{Metadata, StructType}
+import org.apache.spark.util.Utils
 
+/**
+ * A Dataset is a strongly typed collection of domain-specific objects that can be transformed in
+ * parallel using functional or relational operations. Each Dataset also has an untyped view
+ * called a `DataFrame`, which is a Dataset of [[Row]].
+ *
+ * Operations available on Datasets are divided into transformations and actions. Transformations
+ * are the ones that produce new Datasets, and actions are the ones that trigger computation and
+ * return results. Example transformations include map, filter, select, and aggregate (`groupBy`).
+ * Example actions count, show, or writing data out to file systems.
+ *
+ * Datasets are "lazy", i.e. computations are only triggered when an action is invoked.
+ * Internally, a Dataset represents a logical plan that describes the computation required to
+ * produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan
+ * and generates a physical plan for efficient execution in a parallel and distributed manner. To
+ * explore the logical plan as well as optimized physical plan, use the `explain` function.
+ *
+ * To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
+ * the domain specific type `T` to Spark's internal type system. For example, given a class
+ * `Person` with two fields, `name` (string) and `age` (int), an encoder is used to tell Spark to
+ * generate code at runtime to serialize the `Person` object into a binary structure. This binary
+ * structure often has much lower memory footprint as well as are optimized for efficiency in data
+ * processing (e.g. in a columnar format). To understand the internal binary representation for
+ * data, use the `schema` function.
+ *
+ * There are typically two ways to create a Dataset. The most common way is by pointing Spark to
+ * some files on storage systems, using the `read` function available on a `SparkSession`.
+ * {{{
+ *   val people = spark.read.parquet("...").as[Person]  // Scala
+ *   Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
+ * }}}
+ *
+ * Datasets can also be created through transformations available on existing Datasets. For
+ * example, the following creates a new Dataset by applying a filter on the existing one:
+ * {{{
+ *   val names = people.map(_.name)  // in Scala; names is a Dataset[String]
+ *   Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));
+ * }}}
+ *
+ * Dataset operations can also be untyped, through various domain-specific-language (DSL)
+ * functions defined in: Dataset (this class), [[Column]], and [[functions]]. These operations are
+ * very similar to the operations available in the data frame abstraction in R or Python.
+ *
+ * To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
+ * {{{
+ *   val ageCol = people("age")  // in Scala
+ *   Column ageCol = people.col("age"); // in Java
+ * }}}
+ *
+ * Note that the [[Column]] type can also be manipulated through its various functions.
+ * {{{
+ *   // The following creates a new column that increases everybody's age by 10.
+ *   people("age") + 10  // in Scala
+ *   people.col("age").plus(10);  // in Java
+ * }}}
+ *
+ * A more concrete example in Scala:
+ * {{{
+ *   // To create Dataset[Row] using SparkSession
+ *   val people = spark.read.parquet("...")
+ *   val department = spark.read.parquet("...")
+ *
+ *   people.filter("age > 30")
+ *     .join(department, people("deptId") === department("id"))
+ *     .groupBy(department("name"), people("gender"))
+ *     .agg(avg(people("salary")), max(people("age")))
+ * }}}
+ *
+ * and in Java:
+ * {{{
+ *   // To create Dataset<Row> using SparkSession
+ *   Dataset<Row> people = spark.read().parquet("...");
+ *   Dataset<Row> department = spark.read().parquet("...");
+ *
+ *   people.filter(people.col("age").gt(30))
+ *     .join(department, people.col("deptId").equalTo(department.col("id")))
+ *     .groupBy(department.col("name"), people.col("gender"))
+ *     .agg(avg(people.col("salary")), max(people.col("age")));
+ * }}}
+ *
+ * @groupname basic Basic Dataset functions
+ * @groupname action Actions
+ * @groupname untypedrel Untyped transformations
+ * @groupname typedrel Typed transformations
+ *
+ * @since 3.4.0
+ */
 class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan)
     extends Serializable {
 
+  override def toString: String = {
+    try {
+      val builder = new mutable.StringBuilder
+      val fields = schema.take(2).map { f =>
+        s"${f.name}: ${f.dataType.simpleString(2)}"
+      }
+      builder.append("[")
+      builder.append(fields.mkString(", "))
+      if (schema.length > 2) {
+        if (schema.length - fields.size == 1) {
+          builder.append(" ... 1 more field")
+        } else {
+          builder.append(" ... " + (schema.length - 2) + " more fields")
+        }
+      }
+      builder.append("]").toString()
+    } catch {
+      case NonFatal(e) =>
+        s"Invalid Dataframe; ${e.getMessage}"
+    }
+  }
+
   /**
-   * Selects a set of column based expressions.
+   * Converts this strongly typed collection of data to generic Dataframe. In contrast to the
+   * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
+   * objects that allow fields to be accessed by ordinal or name.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def toDF(): DataFrame = {
+    // Note this will change as soon as we add the typed APIs.
+    this.asInstanceOf[Dataset[Row]]
+  }
+
+  /**
+   * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
+   * This can be quite convenient in conversion from an RDD of tuples into a `DataFrame` with
+   * meaningful names. For example:
    * {{{
-   *   ds.select($"colA", $"colB" + 1)
+   *   val rdd: RDD[(Int, String)] = ...
+   *   rdd.toDF()  // this implicit conversion creates a DataFrame with column name `_1` and `_2`
+   *   rdd.toDF("id", "name")  // this creates a DataFrame with column name "id" and "name"
    * }}}
    *
-   * @group untypedrel
+   * @group basic
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def select(cols: Column*): DataFrame = session.newDataset { builder =>
-    builder.getProjectBuilder
+  def toDF(colNames: String*): DataFrame = session.newDataset { builder =>
+    builder.getToDfBuilder
       .setInput(plan.getRoot)
-      .addAllExpressions(cols.map(_.expr).asJava)
+      .addAllColumnNames(colNames.asJava)
   }
 
   /**
-   * Filters rows using the given condition.
+   * Returns a new DataFrame where each row is reconciled to match the specified schema. Spark
+   * will: <ul> <li>Reorder columns and/or inner fields by name to match the specified
+   * schema.</li> <li>Project away columns and/or inner fields that are not needed by the
+   * specified schema. Missing columns and/or inner fields (present in the specified schema but
+   * not input DataFrame) lead to failures.</li> <li>Cast the columns and/or inner fields to match
+   * the data types in the specified schema, if the types are compatible, e.g., numeric to numeric
+   * (error if overflows), but not string to int.</li> <li>Carry over the metadata from the
+   * specified schema, while the columns and/or inner fields still keep their own metadata if not
+   * overwritten by the specified schema.</li> <li>Fail if the nullability is not compatible. For
+   * example, the column and/or inner field is nullable but the specified schema requires them to
+   * be not nullable.</li> </ul>
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def to(schema: StructType): DataFrame = session.newDataset { builder =>
+    builder.getToSchemaBuilder
+      .setInput(plan.getRoot)
+      .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
+  }
+
+  /**
+   * Returns the schema of this Dataset.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def schema: StructType = {
+    DataTypeProtoConverter.toCatalystType(analyze.getSchema).asInstanceOf[StructType]
+  }
+
+  /**
+   * Prints the schema to the console in a nice tree format.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def printSchema(): Unit = printSchema(Int.MaxValue)
+
+  // scalastyle:off println
+  /**
+   * Prints the schema up to the given level to the console in a nice tree format.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def printSchema(level: Int): Unit = println(schema.treeString(level))
+  // scalastyle:on println
+
+  /**
+   * Prints the plans (logical and physical) with a format specified by a given explain mode.
+   *
+   * @param mode
+   *   specifies the expected output format of plans. <ul> <li>`simple` Print only a physical
+   *   plan.</li> <li>`extended`: Print both logical and physical plans.</li> <li>`codegen`: Print
+   *   a physical plan and generated codes if they are available.</li> <li>`cost`: Print a logical
+   *   plan and statistics if they are available.</li> <li>`formatted`: Split explain output into
+   *   two sections: a physical plan outline and node details.</li> </ul>
+   * @group basic
+   * @since 3.4.0
+   */
+  def explain(mode: String): Unit = {
+    val protoMode = mode.trim.toLowerCase(Locale.ROOT) match {
+      case "simple" => proto.Explain.ExplainMode.SIMPLE
+      case "extended" => proto.Explain.ExplainMode.EXTENDED
+      case "codegen" => proto.Explain.ExplainMode.CODEGEN
+      case "cost" => proto.Explain.ExplainMode.COST
+      case "formatted" => proto.Explain.ExplainMode.FORMATTED
+      case _ => throw new IllegalArgumentException("Unsupported explain mode: " + mode)
+    }
+    explain(protoMode)
+  }
+
+  /**
+   * Prints the plans (logical and physical) to the console for debugging purposes.
+   *
+   * @param extended
+   *   default `false`. If `false`, prints only the physical plan.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def explain(extended: Boolean): Unit = {
+    val mode = if (extended) {
+      proto.Explain.ExplainMode.EXTENDED
+    } else {
+      proto.Explain.ExplainMode.SIMPLE
+    }
+    explain(mode)
+  }
+
+  /**
+   * Prints the physical plan to the console for debugging purposes.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def explain(): Unit = explain(proto.Explain.ExplainMode.SIMPLE)
+
+  private def explain(mode: proto.Explain.ExplainMode): Unit = {
+    // scalastyle:off println
+    println(session.analyze(plan, mode).getExplainString)
+    // scalastyle:on println
+  }
+
+  /**
+   * Returns all column names and their data types as an array.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def dtypes: Array[(String, String)] = schema.fields.map { field =>
+    (field.name, field.dataType.toString)
+  }
+
+  /**
+   * Returns all column names as an array.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def columns: Array[String] = schema.fields.map(_.name)
+
+  /**
+   * Returns true if the `collect` and `take` methods can be run locally (without any Spark
+   * executors).
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def isLocal: Boolean = analyze.getIsLocal
+
+  /**
+   * Returns true if the `Dataset` is empty.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def isEmpty: Boolean = select().limit(1).withResult { result =>
+    result.length == 0
+  }
+
+  /**
+   * Returns true if this Dataset contains one or more sources that continuously return data as it
+   * arrives. A Dataset that reads data from a streaming source must be executed as a
+   * `StreamingQuery` using the `start()` method in `DataStreamWriter`.
+   *
+   * @group streaming
+   * @since 3.4.0
+   */
+  def isStreaming: Boolean = analyze.getIsStreaming
+
+  /**
+   * Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
+   * and all cells will be aligned right. For example:
    * {{{
-   *   // The following are equivalent:
-   *   peopleDs.filter($"age" > 15)
-   *   peopleDs.where($"age" > 15)
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
    * }}}
    *
-   * @group typedrel
+   * @param numRows
+   *   Number of rows to show
+   *
+   * @group action
    * @since 3.4.0
    */
-  def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
-    builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
+  def show(numRows: Int): Unit = show(numRows, truncate = true)
+
+  /**
+   * Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters will
+   * be truncated, and all cells will be aligned right.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def show(): Unit = show(20)
+
+  /**
+   * Displays the top 20 rows of Dataset in a tabular form.
+   *
+   * @param truncate
+   *   Whether truncate long strings. If true, strings more than 20 characters will be truncated
+   *   and all cells will be aligned right
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def show(truncate: Boolean): Unit = show(20, truncate)
+
+  /**
+   * Displays the Dataset in a tabular form. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
+   * @param numRows
+   *   Number of rows to show
+   * @param truncate
+   *   Whether truncate long strings. If true, strings more than 20 characters will be truncated
+   *   and all cells will be aligned right
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  // scalastyle:off println
+  def show(numRows: Int, truncate: Boolean): Unit = {
+    val truncateValue = if (truncate) 20 else 0
+    show(numRows, truncateValue, vertical = false)
   }
 
   /**
-   * Returns a new Dataset by taking the first `n` rows. The difference between this function and
-   * `head` is that `head` is an action and returns an array (by triggering query execution) while
-   * `limit` returns a new Dataset.
+   * Displays the Dataset in a tabular form. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
    *
-   * @group typedrel
+   * @param numRows
+   *   Number of rows to show
+   * @param truncate
+   *   If set to more than 0, truncates strings to `truncate` characters and all cells will be
+   *   aligned right.
+   * @group action
    * @since 3.4.0
    */
-  def limit(n: Int): Dataset[T] = session.newDataset { builder =>
-    builder.getLimitBuilder
-      .setInput(plan.getRoot)
-      .setLimit(n)
+  def show(numRows: Int, truncate: Int): Unit = show(numRows, truncate, vertical = false)
+
+  /**
+   * Displays the Dataset in a tabular form. For example:
+   * {{{
+   *   year  month AVG('Adj Close) MAX('Adj Close)
+   *   1980  12    0.503218        0.595103
+   *   1981  01    0.523289        0.570307
+   *   1982  02    0.436504        0.475256
+   *   1983  03    0.410516        0.442194
+   *   1984  04    0.450090        0.483521
+   * }}}
+   *
+   * If `vertical` enabled, this command prints output rows vertically (one line per column
+   * value)?
+   *
+   * {{{
+   * -RECORD 0-------------------
+   *  year            | 1980
+   *  month           | 12
+   *  AVG('Adj Close) | 0.503218
+   *  AVG('Adj Close) | 0.595103
+   * -RECORD 1-------------------
+   *  year            | 1981
+   *  month           | 01
+   *  AVG('Adj Close) | 0.523289
+   *  AVG('Adj Close) | 0.570307
+   * -RECORD 2-------------------
+   *  year            | 1982
+   *  month           | 02
+   *  AVG('Adj Close) | 0.436504
+   *  AVG('Adj Close) | 0.475256
+   * -RECORD 3-------------------
+   *  year            | 1983
+   *  month           | 03
+   *  AVG('Adj Close) | 0.410516
+   *  AVG('Adj Close) | 0.442194
+   * -RECORD 4-------------------
+   *  year            | 1984
+   *  month           | 04
+   *  AVG('Adj Close) | 0.450090
+   *  AVG('Adj Close) | 0.483521
+   * }}}
+   *
+   * @param numRows
+   *   Number of rows to show
+   * @param truncate
+   *   If set to more than 0, truncates strings to `truncate` characters and all cells will be
+   *   aligned right.
+   * @param vertical
+   *   If set to true, prints output rows vertically (one line per column value).
+   * @group action
+   * @since 3.4.0
+   */
+  def show(numRows: Int, truncate: Int, vertical: Boolean): Unit = {
+    val df = session.newDataset { builder =>
+      builder.getShowStringBuilder
+        .setInput(plan.getRoot)
+        .setNumRows(numRows)
+        .setTruncate(truncate)
+        .setVertical(vertical)
+    }
+    df.withResult { result =>
+      assert(result.length == 1)
+      assert(result.schema.size == 1)
+      // scalastyle:off println
+      println(result.toArray.head.getString(0))
+    // scalastyle:on println
+    }
+  }
+
+  private def buildJoin(right: Dataset[_])(f: proto.Join.Builder => Unit): DataFrame = {
+    session.newDataset { builder =>
+      val joinBuilder = builder.getJoinBuilder
+      joinBuilder.setLeft(plan.getRoot).setRight(right.plan.getRoot)
+      f(joinBuilder)
+    }
   }
 
-  private[sql] def analyze: proto.AnalyzePlanResponse = session.analyze(plan)
+  private def toJoinType(name: String): proto.Join.JoinType = {
+    name.trim.toLowerCase(Locale.ROOT) match {
+      case "inner" =>
+        proto.Join.JoinType.JOIN_TYPE_INNER
+      case "cross" =>
+        proto.Join.JoinType.JOIN_TYPE_CROSS
+      case "outer" | "full" | "fullouter" | "full_outer" =>
+        proto.Join.JoinType.JOIN_TYPE_FULL_OUTER
+      case "left" | "leftouter" | "left_outer" =>
+        proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER
+      case "right" | "rightouter" | "right_outer" =>
+        proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER
+      case "semi" | "leftsemi" | "left_semi" =>
+        proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI
+      case "anti" | "leftanti" | "left_anti" =>
+        proto.Join.JoinType.JOIN_TYPE_LEFT_ANTI
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported join type `joinType`.")
+    }
+  }
 
-  def collectResult(): SparkResult = session.execute(plan)
+  /**
+   * Join with another `DataFrame`.
+   *
+   * Behaves as an INNER JOIN and requires a subsequent join predicate.
+   *
+   * @param right
+   *   Right side of the join operation.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_]): DataFrame = buildJoin(right) { builder =>
+    builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_INNER)
+  }
+
+  /**
+   * Inner equi-join with another `DataFrame` using the given column.
+   *
+   * Different from other join functions, the join column will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * {{{
+   *   // Joining df1 and df2 using the column "user_id"
+   *   df1.join(df2, "user_id")
+   * }}}
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumn
+   *   Name of the column to join on. This column must exist on both sides.
+   *
+   * @note
+   *   If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+   *   will NOT be able to reference any columns after the join, since there is no way to
+   *   disambiguate which side of the join you would like to reference.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumn: String): DataFrame = {
+    join(right, Seq(usingColumn))
+  }
+
+  /**
+   * (Java-specific) Inner equi-join with another `DataFrame` using the given columns. See the
+   * Scala-specific overload for more details.
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumns
+   *   Names of the columns to join on. This columns must exist on both sides.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Array[String]): DataFrame = {
+    join(right, usingColumns.toSeq)
+  }
+
+  /**
+   * (Scala-specific) Inner equi-join with another `DataFrame` using the given columns.
+   *
+   * Different from other join functions, the join columns will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * {{{
+   *   // Joining df1 and df2 using the columns "user_id" and "user_name"
+   *   df1.join(df2, Seq("user_id", "user_name"))
+   * }}}
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumns
+   *   Names of the columns to join on. This columns must exist on both sides.
+   *
+   * @note
+   *   If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+   *   will NOT be able to reference any columns after the join, since there is no way to
+   *   disambiguate which side of the join you would like to reference.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame = {
+    join(right, usingColumns, "inner")
+  }
+
+  /**
+   * Equi-join with another `DataFrame` using the given column. A cross join with a predicate is
+   * specified as an inner join. If you would explicitly like to perform a cross join use the
+   * `crossJoin` method.
+   *
+   * Different from other join functions, the join column will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumn
+   *   Name of the column to join on. This column must exist on both sides.
+   * @param joinType
+   *   Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+   *   `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+   *   `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+   *   `left_anti`.
+   *
+   * @note
+   *   If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+   *   will NOT be able to reference any columns after the join, since there is no way to
+   *   disambiguate which side of the join you would like to reference.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumn: String, joinType: String): DataFrame = {
+    join(right, Seq(usingColumn), joinType)
+  }
+
+  /**
+   * (Java-specific) Equi-join with another `DataFrame` using the given columns. See the
+   * Scala-specific overload for more details.
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumns
+   *   Names of the columns to join on. This columns must exist on both sides.
+   * @param joinType
+   *   Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+   *   `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+   *   `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+   *   `left_anti`.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Array[String], joinType: String): DataFrame = {
+    join(right, usingColumns.toSeq, joinType)
+  }
+
+  /**
+   * (Scala-specific) Equi-join with another `DataFrame` using the given columns. A cross join
+   * with a predicate is specified as an inner join. If you would explicitly like to perform a
+   * cross join use the `crossJoin` method.
+   *
+   * Different from other join functions, the join columns will only appear once in the output,
+   * i.e. similar to SQL's `JOIN USING` syntax.
+   *
+   * @param right
+   *   Right side of the join operation.
+   * @param usingColumns
+   *   Names of the columns to join on. This columns must exist on both sides.
+   * @param joinType
+   *   Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+   *   `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+   *   `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+   *   `left_anti`.
+   *
+   * @note
+   *   If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+   *   will NOT be able to reference any columns after the join, since there is no way to
+   *   disambiguate which side of the join you would like to reference.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
+    buildJoin(right) { builder =>
+      builder
+        .setJoinType(toJoinType(joinType))
+        .addAllUsingColumns(usingColumns.asJava)
+    }
+  }
+
+  /**
+   * Inner join with another `DataFrame`, using the given join expression.
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *   df1.join(df2, $"df1Key" === $"df2Key")
+   *   df1.join(df2).where($"df1Key" === $"df2Key")
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], joinExprs: Column): DataFrame = join(right, joinExprs, "inner")
+
+  /**
+   * Join with another `DataFrame`, using the given join expression. The following performs a full
+   * outer join between `df1` and `df2`.
+   *
+   * {{{
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df1.join(df2, $"df1Key" === $"df2Key", "outer")
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
+   * }}}
+   *
+   * @param right
+   *   Right side of the join.
+   * @param joinExprs
+   *   Join expression.
+   * @param joinType
+   *   Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+   *   `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+   *   `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+   *   `left_anti`.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = {
+    buildJoin(right) { builder =>
+      builder
+        .setJoinType(toJoinType(joinType))
+        .setJoinCondition(joinExprs.expr)
+    }
+  }
+
+  /**
+   * Explicit cartesian join with another `DataFrame`.
+   *
+   * @param right
+   *   Right side of the join operation.
+   *
+   * @note
+   *   Cartesian joins are very expensive without an extra filter that can be pushed down.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def crossJoin(right: Dataset[_]): DataFrame = buildJoin(right) { builder =>
+    builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS)
+  }
+
+  private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
+    session.newDataset { builder =>
+      builder.getSortBuilder
+        .setInput(plan.getRoot)
+        .setIsGlobal(false)
+        .addAllOrder(sortExprs.map(_.sortOrder).asJava)
+    }
+  }
+
+  /**
+   * Returns a new Dataset with each partition sorted by the given expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = {
+    sortWithinPartitions((sortCol +: sortCols).map(Column(_)): _*)
+  }
+
+  /**
+   * Returns a new Dataset with each partition sorted by the given expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
+    buildSort(global = false, sortExprs)
+  }
+
+  /**
+   * Returns a new Dataset sorted by the specified column, all in ascending order.
+   * {{{
+   *   // The following 3 are equivalent
+   *   ds.sort("sortcol")
+   *   ds.sort($"sortcol")
+   *   ds.sort($"sortcol".asc)
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sort(sortCol: String, sortCols: String*): Dataset[T] = {
+    sort((sortCol +: sortCols).map(Column(_)): _*)
+  }
+
+  /**
+   * Returns a new Dataset sorted by the given expressions. For example:
+   * {{{
+   *   ds.sort($"col1", $"col2".desc)
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def sort(sortExprs: Column*): Dataset[T] = {
+    buildSort(global = true, sortExprs)
+  }
+
+  /**
+   * Returns a new Dataset sorted by the given expressions. This is an alias of the `sort`
+   * function.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols: _*)
+
+  /**
+   * Returns a new Dataset sorted by the given expressions. This is an alias of the `sort`
+   * function.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs: _*)
+
+  /**
+   * Selects column based on the column name and returns it as a [[Column]].
+   *
+   * @note
+   *   The column name can also reference to a nested column like `a.b`.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def apply(colName: String): Column = col(colName)
+
+  /**
+   * Specifies some hint on the current Dataset. As an example, the following code specifies that
+   * one of the plan can be broadcasted:
+   *
+   * {{{
+   *   df1.join(df2.hint("broadcast"))
+   * }}}
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def hint(name: String, parameters: Any*): Dataset[T] = session.newDataset { builder =>
+    builder.getHintBuilder
+      .setInput(plan.getRoot)
+      .setName(name)
+      .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava)
+  }
+
+  /**
+   * Selects column based on the column name and returns it as a [[Column]].
+   *
+   * @note
+   *   The column name can also reference to a nested column like `a.b`.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def col(colName: String): Column = functions.col(colName)
+
+  /**
+   * Selects column based on the column name specified as a regex and returns it as [[Column]].
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def colRegex(colName: String): Column = Column { builder =>
+    builder.getUnresolvedRegexBuilder.setColName(colName)
+  }
+
+  /**
+   * Returns a new Dataset with an alias set.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def as(alias: String): Dataset[T] = session.newDataset { builder =>
+    builder.getSubqueryAliasBuilder
+      .setInput(plan.getRoot)
+      .setAlias(alias)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset with an alias set.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def as(alias: Symbol): Dataset[T] = as(alias.name)
+
+  /**
+   * Returns a new Dataset with an alias set. Same as `as`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def alias(alias: String): Dataset[T] = as(alias)
+
+  /**
+   * (Scala-specific) Returns a new Dataset with an alias set. Same as `as`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def alias(alias: Symbol): Dataset[T] = as(alias)
+
+  /**
+   * Selects a set of column based expressions.
+   * {{{
+   *   ds.select($"colA", $"colB" + 1)
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def select(cols: Column*): DataFrame = session.newDataset { builder =>
+    builder.getProjectBuilder
+      .setInput(plan.getRoot)
+      .addAllExpressions(cols.map(_.expr).asJava)
+  }
+
+  /**
+   * Selects a set of columns. This is a variant of `select` that can only select existing columns
+   * using column names (i.e. cannot construct expressions).
+   *
+   * {{{
+   *   // The following two are equivalent:
+   *   ds.select("colA", "colB")
+   *   ds.select($"colA", $"colB")
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)): _*)
+
+  /**
+   * Selects a set of SQL expressions. This is a variant of `select` that accepts SQL expressions.
+   *
+   * {{{
+   *   // The following are equivalent:
+   *   ds.selectExpr("colA", "colB as newName", "abs(colC)")
+   *   ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
+   * }}}
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def selectExpr(exprs: String*): DataFrame = {
+    select(exprs.map(functions.expr): _*)
+  }
+
+  /**
+   * Filters rows using the given condition.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDs.filter($"age" > 15)
+   *   peopleDs.where($"age" > 15)
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
+    builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
+  }
+
+  /**
+   * Filters rows using the given SQL expression.
+   * {{{
+   *   peopleDs.filter("age > 15")
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def filter(conditionExpr: String): Dataset[T] = filter(functions.expr(conditionExpr))
+
+  /**
+   * Filters rows using the given condition. This is an alias for `filter`.
+   * {{{
+   *   // The following are equivalent:
+   *   peopleDs.filter($"age" > 15)
+   *   peopleDs.where($"age" > 15)
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def where(condition: Column): Dataset[T] = filter(condition)
+
+  /**
+   * Filters rows using the given SQL expression.
+   * {{{
+   *   peopleDs.where("age > 15")
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def where(conditionExpr: String): Dataset[T] = filter(conditionExpr)
+
+  private def buildUnpivot(
+      ids: Array[Column],
+      valuesOption: Option[Array[Column]],
+      variableColumnName: String,
+      valueColumnName: String): DataFrame = session.newDataset { builder =>
+    val unpivot = builder.getUnpivotBuilder
+      .setInput(plan.getRoot)
+      .addAllIds(ids.toSeq.map(_.expr).asJava)
+      .setValueColumnName(variableColumnName)
+      .setValueColumnName(valueColumnName)
+    valuesOption.foreach { values =>
+      unpivot.getValuesBuilder
+        .addAllValues(values.toSeq.map(_.expr).asJava)
+    }
+  }
+
+  /**
+   * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+   * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+   * which cannot be reversed.
+   *
+   * This function is useful to massage a DataFrame into a format where some columns are
+   * identifier columns ("ids"), while all other columns ("values") are "unpivoted" to the rows,
+   * leaving just two non-id columns, named as given by `variableColumnName` and
+   * `valueColumnName`.
+   *
+   * {{{
+   *   val df = Seq((1, 11, 12L), (2, 21, 22L)).toDF("id", "int", "long")
+   *   df.show()
+   *   // output:
+   *   // +---+---+----+
+   *   // | id|int|long|
+   *   // +---+---+----+
+   *   // |  1| 11|  12|
+   *   // |  2| 21|  22|
+   *   // +---+---+----+
+   *
+   *   df.unpivot(Array($"id"), Array($"int", $"long"), "variable", "value").show()
+   *   // output:
+   *   // +---+--------+-----+
+   *   // | id|variable|value|
+   *   // +---+--------+-----+
+   *   // |  1|     int|   11|
+   *   // |  1|    long|   12|
+   *   // |  2|     int|   21|
+   *   // |  2|    long|   22|
+   *   // +---+--------+-----+
+   *   // schema:
+   *   //root
+   *   // |-- id: integer (nullable = false)
+   *   // |-- variable: string (nullable = false)
+   *   // |-- value: long (nullable = true)
+   * }}}
+   *
+   * When no "id" columns are given, the unpivoted DataFrame consists of only the "variable" and
+   * "value" columns.
+   *
+   * All "value" columns must share a least common data type. Unless they are the same data type,
+   * all "value" columns are cast to the nearest common data type. For instance, types
+   * `IntegerType` and `LongType` are cast to `LongType`, while `IntegerType` and `StringType` do
+   * not have a common data type and `unpivot` fails with an `AnalysisException`.
+   *
+   * @param ids
+   *   Id columns
+   * @param values
+   *   Value columns to unpivot
+   * @param variableColumnName
+   *   Name of the variable column
+   * @param valueColumnName
+   *   Name of the value column
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def unpivot(
+      ids: Array[Column],
+      values: Array[Column],
+      variableColumnName: String,
+      valueColumnName: String): DataFrame = {
+    buildUnpivot(ids, Option(values), variableColumnName, valueColumnName)
+  }
+
+  /**
+   * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+   * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+   * which cannot be reversed.
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+   *
+   * This is equivalent to calling `Dataset#unpivot(Array, Array, String, String)` where `values`
+   * is set to all non-id columns that exist in the DataFrame.
+   *
+   * @param ids
+   *   Id columns
+   * @param variableColumnName
+   *   Name of the variable column
+   * @param valueColumnName
+   *   Name of the value column
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def unpivot(
+      ids: Array[Column],
+      variableColumnName: String,
+      valueColumnName: String): DataFrame = {
+    buildUnpivot(ids, None, variableColumnName, valueColumnName)
+  }
+
+  /**
+   * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+   * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+   * which cannot be reversed. This is an alias for `unpivot`.
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+   *
+   * @param ids
+   *   Id columns
+   * @param values
+   *   Value columns to unpivot
+   * @param variableColumnName
+   *   Name of the variable column
+   * @param valueColumnName
+   *   Name of the value column
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def melt(
+      ids: Array[Column],
+      values: Array[Column],
+      variableColumnName: String,
+      valueColumnName: String): DataFrame =
+    unpivot(ids, values, variableColumnName, valueColumnName)
+
+  /**
+   * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+   * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+   * which cannot be reversed. This is an alias for `unpivot`.
+   *
+   * @see
+   *   `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+   *
+   * This is equivalent to calling `Dataset#unpivot(Array, Array, String, String)` where `values`
+   * is set to all non-id columns that exist in the DataFrame.
+   *
+   * @param ids
+   *   Id columns
+   * @param variableColumnName
+   *   Name of the variable column
+   * @param valueColumnName
+   *   Name of the value column
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def melt(ids: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame =
+    unpivot(ids, variableColumnName, valueColumnName)
+
+  /**
+   * Returns a new Dataset by taking the first `n` rows. The difference between this function and
+   * `head` is that `head` is an action and returns an array (by triggering query execution) while
+   * `limit` returns a new Dataset.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def limit(n: Int): Dataset[T] = session.newDataset { builder =>
+    builder.getLimitBuilder
+      .setInput(plan.getRoot)
+      .setLimit(n)
+  }
+
+  /**
+   * Returns a new Dataset by skipping the first `n` rows.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def offset(n: Int): Dataset[T] = session.newDataset { builder =>
+    builder.getOffsetBuilder
+      .setInput(plan.getRoot)
+      .setOffset(n)
+  }
+
+  private def buildSetOp(right: Dataset[T], setOpType: proto.SetOperation.SetOpType)(
+      f: proto.SetOperation.Builder => Unit): Dataset[T] = {
+    session.newDataset { builder =>
+      f(
+        builder.getSetOpBuilder
+          .setSetOpType(setOpType)
+          .setLeftInput(plan.getRoot)
+          .setRightInput(right.plan.getRoot))
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+   *
+   * This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union (that does
+   * deduplication of elements), use this function followed by a [[distinct]].
+   *
+   * Also as standard in SQL, this function resolves columns by position (not by name):
+   *
+   * {{{
+   *   val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+   *   val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+   *   df1.union(df2).show
+   *
+   *   // output:
+   *   // +----+----+----+
+   *   // |col0|col1|col2|
+   *   // +----+----+----+
+   *   // |   1|   2|   3|
+   *   // |   4|   5|   6|
+   *   // +----+----+----+
+   * }}}
+   *
+   * Notice that the column positions in the schema aren't necessarily matched with the fields in
+   * the strongly typed objects in a Dataset. This function resolves columns by their positions in
+   * the schema, not the fields in the strongly typed objects. Use [[unionByName]] to resolve
+   * columns by field name in the typed objects.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def union(other: Dataset[T]): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_UNION) { builder =>
+      builder.setIsAll(true)
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is
+   * an alias for `union`.
+   *
+   * This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union (that does
+   * deduplication of elements), use this function followed by a [[distinct]].
+   *
+   * Also as standard in SQL, this function resolves columns by position (not by name).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def unionAll(other: Dataset[T]): Dataset[T] = union(other)
+
+  /**
+   * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+   *
+   * This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set
+   * union (that does deduplication of elements), use this function followed by a [[distinct]].
+   *
+   * The difference between this function and [[union]] is that this function resolves columns by
+   * name (not by position):
+   *
+   * {{{
+   *   val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+   *   val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+   *   df1.unionByName(df2).show
+   *
+   *   // output:
+   *   // +----+----+----+
+   *   // |col0|col1|col2|
+   *   // +----+----+----+
+   *   // |   1|   2|   3|
+   *   // |   6|   4|   5|
+   *   // +----+----+----+
+   * }}}
+   *
+   * Note that this supports nested columns in struct and array types. Nested columns in map types
+   * are not currently supported.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def unionByName(other: Dataset[T]): Dataset[T] = unionByName(other, allowMissingColumns = false)
+
+  /**
+   * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+   *
+   * The difference between this function and [[union]] is that this function resolves columns by
+   * name (not by position).
+   *
+   * When the parameter `allowMissingColumns` is `true`, the set of column names in this and other
+   * `Dataset` can differ; missing columns will be filled with null. Further, the missing columns
+   * of this `Dataset` will be added at the end in the schema of the union result:
+   *
+   * {{{
+   *   val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+   *   val df2 = Seq((4, 5, 6)).toDF("col1", "col0", "col3")
+   *   df1.unionByName(df2, true).show
+   *
+   *   // output: "col3" is missing at left df1 and added at the end of schema.
+   *   // +----+----+----+----+
+   *   // |col0|col1|col2|col3|
+   *   // +----+----+----+----+
+   *   // |   1|   2|   3|null|
+   *   // |   5|   4|null|   6|
+   *   // +----+----+----+----+
+   *
+   *   df2.unionByName(df1, true).show
+   *
+   *   // output: "col2" is missing at left df2 and added at the end of schema.
+   *   // +----+----+----+----+
+   *   // |col1|col0|col3|col2|
+   *   // +----+----+----+----+
+   *   // |   4|   5|   6|null|
+   *   // |   2|   1|null|   3|
+   *   // +----+----+----+----+
+   * }}}
+   *
+   * Note that this supports nested columns in struct and array types. With `allowMissingColumns`,
+   * missing nested columns of struct columns with the same name will also be filled with null
+   * values and added to the end of struct. Nested columns in map types are not currently
+   * supported.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_UNION) { builder =>
+      builder.setByName(true).setIsAll(true).setAllowMissingColumns(allowMissingColumns)
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is
+   * equivalent to `INTERSECT` in SQL.
+   *
+   * @note
+   *   Equality checking is performed directly on the encoded representation of the data and thus
+   *   is not affected by a custom `equals` function defined on `T`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def intersect(other: Dataset[T]): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT) { builder =>
+      builder.setIsAll(false)
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing rows only in both this Dataset and another Dataset while
+   * preserving the duplicates. This is equivalent to `INTERSECT ALL` in SQL.
+   *
+   * @note
+   *   Equality checking is performed directly on the encoded representation of the data and thus
+   *   is not affected by a custom `equals` function defined on `T`. Also as standard in SQL, this
+   *   function resolves columns by position (not by name).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def intersectAll(other: Dataset[T]): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT) { builder =>
+      builder.setIsAll(true)
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is
+   * equivalent to `EXCEPT DISTINCT` in SQL.
+   *
+   * @note
+   *   Equality checking is performed directly on the encoded representation of the data and thus
+   *   is not affected by a custom `equals` function defined on `T`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def except(other: Dataset[T]): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT) { builder =>
+      builder.setIsAll(false)
+    }
+  }
+
+  /**
+   * Returns a new Dataset containing rows in this Dataset but not in another Dataset while
+   * preserving the duplicates. This is equivalent to `EXCEPT ALL` in SQL.
+   *
+   * @note
+   *   Equality checking is performed directly on the encoded representation of the data and thus
+   *   is not affected by a custom `equals` function defined on `T`. Also as standard in SQL, this
+   *   function resolves columns by position (not by name).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def exceptAll(other: Dataset[T]): Dataset[T] = {
+    buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT) { builder =>
+      builder.setIsAll(true)
+    }
+  }
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of rows (without replacement), using a
+   * user-supplied seed.
+   *
+   * @param fraction
+   *   Fraction of rows to generate, range [0.0, 1.0].
+   * @param seed
+   *   Seed for sampling.
+   *
+   * @note
+   *   This is NOT guaranteed to provide exactly the fraction of the count of the given
+   *   [[Dataset]].
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def sample(fraction: Double, seed: Long): Dataset[T] = {
+    sample(withReplacement = false, fraction = fraction, seed = seed)
+  }
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of rows (without replacement), using a
+   * random seed.
+   *
+   * @param fraction
+   *   Fraction of rows to generate, range [0.0, 1.0].
+   *
+   * @note
+   *   This is NOT guaranteed to provide exactly the fraction of the count of the given
+   *   [[Dataset]].
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def sample(fraction: Double): Dataset[T] = {
+    sample(withReplacement = false, fraction = fraction)
+  }
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of rows, using a user-supplied seed.
+   *
+   * @param withReplacement
+   *   Sample with replacement or not.
+   * @param fraction
+   *   Fraction of rows to generate, range [0.0, 1.0].
+   * @param seed
+   *   Seed for sampling.
+   *
+   * @note
+   *   This is NOT guaranteed to provide exactly the fraction of the count of the given
+   *   [[Dataset]].
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
+    session.newDataset { builder =>
+      builder.getSampleBuilder
+        .setInput(plan.getRoot)
+        .setWithReplacement(withReplacement)
+        .setLowerBound(0.0d)
+        .setUpperBound(fraction)
+        .setSeed(seed)
+    }
+  }
+
+  /**
+   * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed.
+   *
+   * @param withReplacement
+   *   Sample with replacement or not.
+   * @param fraction
+   *   Fraction of rows to generate, range [0.0, 1.0].
+   *
+   * @note
+   *   This is NOT guaranteed to provide exactly the fraction of the total count of the given
+   *   [[Dataset]].
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
+    sample(withReplacement, fraction, Utils.random.nextLong)
+  }
+
+  /**
+   * Randomly splits this Dataset with the provided weights.
+   *
+   * @param weights
+   *   weights for splits, will be normalized if they don't sum to 1.
+   * @param seed
+   *   Seed for sampling.
+   *
+   * For Java API, use [[randomSplitAsList]].
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = {
+    require(
+      weights.forall(_ >= 0),
+      s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
+    require(
+      weights.sum > 0,
+      s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
+
+    // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+    // constituent partitions each time a split is materialized which could result in
+    // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+    // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+    // from the sort order.
+    // TODO we need to have a proper way of stabilizing the input data. The current approach does
+    //  not work well with spark connects' extremely lazy nature. When the schema is modified
+    //  between construction and execution the query might fail or produce wrong results. Another
+    //  problem can come from data that arrives between the execution of the returned datasets.
+    val sortOrder = schema.collect {
+      case f if RowOrdering.isOrderable(f.dataType) => col(f.name).asc
+    }
+    val sortedInput = sortWithinPartitions(sortOrder: _*).plan.getRoot
+    val sum = weights.sum
+    val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
+    normalizedCumWeights
+      .sliding(2)
+      .map { case Array(low, high) =>
+        session.newDataset[T] { builder =>
+          builder.getSampleBuilder
+            .setInput(sortedInput)
+            .setWithReplacement(false)
+            .setLowerBound(low)
+            .setUpperBound(high)
+            .setSeed(seed)
+        }
+      }
+      .toArray
+  }
+
+  /**
+   * Returns a Java list that contains randomly split Dataset with the provided weights.
+   *
+   * @param weights
+   *   weights for splits, will be normalized if they don't sum to 1.
+   * @param seed
+   *   Seed for sampling.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def randomSplitAsList(weights: Array[Double], seed: Long): java.util.List[Dataset[T]] = {
+    val values = randomSplit(weights, seed)
+    java.util.Arrays.asList(values: _*)
+  }
+
+  /**
+   * Randomly splits this Dataset with the provided weights.
+   *
+   * @param weights
+   *   weights for splits, will be normalized if they don't sum to 1.
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
+    randomSplit(weights, Utils.random.nextLong)
+  }
+
+  private def withColumns(names: Seq[String], values: Seq[Column]): DataFrame = {
+    val aliases = values.zip(names).map { case (value, name) =>
+      value.name(name).expr.getAlias
+    }
+    session.newDataset { builder =>
+      builder.getWithColumnsBuilder
+        .setInput(plan.getRoot)
+        .addAllAliases(aliases.asJava)
+    }
+  }
+
+  /**
+   * Returns a new Dataset by adding a column or replacing the existing column that has the same
+   * name.
+   *
+   * `column`'s expression must only refer to attributes supplied by this Dataset. It is an error
+   * to add a column that refers to some other Dataset.
+   *
+   * @note
+   *   this method introduces a projection internally. Therefore, calling it multiple times, for
+   *   instance, via loops in order to add multiple columns can generate big plans which can cause
+   *   performance issues and even `StackOverflowException`. To avoid this, use `select` with the
+   *   multiple columns at once.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
+
+  /**
+   * (Scala-specific) Returns a new Dataset by adding columns or replacing the existing columns
+   * that has the same names.
+   *
+   * `colsMap` is a map of column name and column, the column must only refer to attributes
+   * supplied by this Dataset. It is an error to add columns that refers to some other Dataset.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withColumns(colsMap: Map[String, Column]): DataFrame = {
+    val (colNames, newCols) = colsMap.toSeq.unzip
+    withColumns(colNames, newCols)
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset by adding columns or replacing the existing columns
+   * that has the same names.
+   *
+   * `colsMap` is a map of column name and column, the column must only refer to attribute
+   * supplied by this Dataset. It is an error to add columns that refers to some other Dataset.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withColumns(colsMap: java.util.Map[String, Column]): DataFrame = withColumns(
+    colsMap.asScala.toMap)
+
+  /**
+   * Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain
+   * existingName.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withColumnRenamed(existingName: String, newName: String): DataFrame = {
+    withColumnsRenamed(Collections.singletonMap(existingName, newName))
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset with a columns renamed. This is a no-op if schema
+   * doesn't contain existingName.
+   *
+   * `colsMap` is a map of existing column name and new column name.
+   *
+   * @throws AnalysisException
+   *   if there are duplicate names in resulting projection
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @throws[AnalysisException]
+  def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = {
+    withColumnsRenamed(colsMap.asJava)
+  }
+
+  /**
+   * (Java-specific) Returns a new Dataset with a columns renamed. This is a no-op if schema
+   * doesn't contain existingName.
+   *
+   * `colsMap` is a map of existing column name and new column name.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame = {
+    session.newDataset { builder =>
+      builder.getWithColumnsRenamedBuilder
+        .setInput(plan.getRoot)
+        .putAllRenameColumnsMap(colsMap)
+    }
+  }
+
+  /**
+   * Returns a new Dataset by updating an existing column with metadata.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def withMetadata(columnName: String, metadata: Metadata): DataFrame = {
+    val newAlias = proto.Expression.Alias
+      .newBuilder()
+      .setExpr(col(columnName).expr)
+      .addName(columnName)
+      .setMetadata(metadata.json)
+    session.newDataset { builder =>
+      builder.getWithColumnsBuilder
+        .setInput(plan.getRoot)
+        .addAliases(newAlias)
+    }
+  }
+
+  /**
+   * Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column
+   * name.
+   *
+   * This method can only be used to drop top level columns. the colName string is treated
+   * literally without further interpretation.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def drop(colName: String): DataFrame = {
+    drop(functions.col(colName))
+  }
+
+  /**
+   * Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column
+   * name(s).
+   *
+   * This method can only be used to drop top level columns. the colName string is treated
+   * literally without further interpretation.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def drop(colNames: String*): DataFrame = buildDrop(colNames.map(functions.col))
+
+  /**
+   * Returns a new Dataset with column dropped.
+   *
+   * This method can only be used to drop top level column. This version of drop accepts a
+   * [[Column]] rather than a name. This is a no-op if the Dataset doesn't have a column with an
+   * equivalent expression.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  def drop(col: Column): DataFrame = {
+    buildDrop(col :: Nil)
+  }
+
+  /**
+   * Returns a new Dataset with columns dropped.
+   *
+   * This method can only be used to drop top level columns. This is a no-op if the Dataset
+   * doesn't have a columns with an equivalent expression.
+   *
+   * @group untypedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def drop(col: Column, cols: Column*): DataFrame = buildDrop(col +: cols)
+
+  private def buildDrop(cols: Seq[Column]): DataFrame = session.newDataset { builder =>
+    builder.getDropBuilder
+      .setInput(plan.getRoot)
+      .addAllCols(cols.map(_.expr).asJava)
+  }
+
+  /**
+   * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias
+   * for `distinct`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def dropDuplicates(): Dataset[T] = session.newDataset { builder =>
+    builder.getDeduplicateBuilder
+      .setInput(plan.getRoot)
+      .setAllColumnsAsKeys(true)
+  }
+
+  /**
+   * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the
+   * subset of columns.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def dropDuplicates(colNames: Seq[String]): Dataset[T] = session.newDataset { builder =>
+    builder.getDeduplicateBuilder
+      .setInput(plan.getRoot)
+      .addAllColumnNames(colNames.asJava)
+  }
+
+  /**
+   * Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
+
+  /**
+   * Returns a new [[Dataset]] with duplicate rows removed, considering only the subset of
+   * columns.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
+    val colNames: Seq[String] = col1 +: cols
+    dropDuplicates(colNames)
+  }
+
+  /**
+   * Computes basic statistics for numeric and string columns, including count, mean, stddev, min,
+   * and max. If no columns are given, this function computes statistics for all numerical or
+   * string columns.
+   *
+   * This function is meant for exploratory data analysis, as we make no guarantee about the
+   * backward compatibility of the schema of the resulting Dataset. If you want to
+   * programmatically compute summary statistics, use the `agg` function instead.
+   *
+   * {{{
+   *   ds.describe("age", "height").show()
+   *
+   *   // output:
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean    53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min     18.0  163.0
+   *   // max     92.0  192.0
+   * }}}
+   *
+   * Use [[summary]] for expanded statistics and control over which statistics to compute.
+   *
+   * @param cols
+   *   Columns to compute statistics on.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def describe(cols: String*): DataFrame = session.newDataset { builder =>
+    builder.getDescribeBuilder
+      .setInput(plan.getRoot)
+      .addAllCols(cols.asJava)
+  }
+
+  /**
+   * Computes specified statistics for numeric and string columns. Available statistics are: <ul>
+   * <li>count</li> <li>mean</li> <li>stddev</li> <li>min</li> <li>max</li> <li>arbitrary
+   * approximate percentiles specified as a percentage (e.g. 75%)</li> <li>count_distinct</li>
+   * <li>approx_count_distinct</li> </ul>
+   *
+   * If no statistics are given, this function computes count, mean, stddev, min, approximate
+   * quartiles (percentiles at 25%, 50%, and 75%), and max.
+   *
+   * This function is meant for exploratory data analysis, as we make no guarantee about the
+   * backward compatibility of the schema of the resulting Dataset. If you want to
+   * programmatically compute summary statistics, use the `agg` function instead.
+   *
+   * {{{
+   *   ds.summary().show()
+   *
+   *   // output:
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // mean    53.3  178.05
+   *   // stddev  11.6  15.7
+   *   // min     18.0  163.0
+   *   // 25%     24.0  176.0
+   *   // 50%     24.0  176.0
+   *   // 75%     32.0  180.0
+   *   // max     92.0  192.0
+   * }}}
+   *
+   * {{{
+   *   ds.summary("count", "min", "25%", "75%", "max").show()
+   *
+   *   // output:
+   *   // summary age   height
+   *   // count   10.0  10.0
+   *   // min     18.0  163.0
+   *   // 25%     24.0  176.0
+   *   // 75%     32.0  180.0
+   *   // max     92.0  192.0
+   * }}}
+   *
+   * To do a summary for specific columns first select them:
+   *
+   * {{{
+   *   ds.select("age", "height").summary().show()
+   * }}}
+   *
+   * Specify statistics to output custom summaries:
+   *
+   * {{{
+   *   ds.summary("count", "count_distinct").show()
+   * }}}
+   *
+   * The distinct count isn't included by default.
+   *
+   * You can also run approximate distinct counts which are faster:
+   *
+   * {{{
+   *   ds.summary("count", "approx_count_distinct").show()
+   * }}}
+   *
+   * See also [[describe]] for basic statistics.
+   *
+   * @param statistics
+   *   Statistics from above list to be computed.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def summary(statistics: String*): DataFrame = session.newDataset { builder =>
+    builder.getSummaryBuilder
+      .setInput(plan.getRoot)
+      .addAllStatistics(statistics.asJava)
+  }
+
+  /**
+   * Returns the first `n` rows.
+   *
+   * @note
+   *   this method should only be used if the resulting array is expected to be small, as all the
+   *   data is loaded into the driver's memory.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def head(n: Int): Array[T] = limit(n).collect()
+
+  /**
+   * Returns the first row.
+   * @group action
+   * @since 3.4.0
+   */
+  def head(): T = head(1).head
+
+  /**
+   * Returns the first row. Alias for head().
+   * @group action
+   * @since 3.4.0
+   */
+  def first(): T = head()
+
+  /**
+   * Concise syntax for chaining custom transformations.
+   * {{{
+   *   def featurize(ds: Dataset[T]): Dataset[U] = ...
+   *
+   *   ds
+   *     .transform(featurize)
+   *     .transform(...)
+   * }}}
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
+
+  /**
+   * Returns the first `n` rows in the Dataset.
+   *
+   * Running take requires moving data into the application's driver process, and doing so with a
+   * very large `n` can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def take(n: Int): Array[T] = head(n)
+
+  /**
+   * Returns the last `n` rows in the Dataset.
+   *
+   * Running tail requires moving data into the application's driver process, and doing so with a
+   * very large `n` can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def tail(n: Int): Array[T] = {
+    val lastN = session.newDataset[T] { builder =>
+      builder.getTailBuilder
+        .setInput(plan.getRoot)
+        .setLimit(n)
+    }
+    lastN.collect()
+  }
+
+  /**
+   * Returns the first `n` rows in the Dataset as a list.
+   *
+   * Running take requires moving data into the application's driver process, and doing so with a
+   * very large `n` can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n): _*)
+
+  /**
+   * Returns an array that contains all rows in this Dataset.
+   *
+   * Running collect requires moving all the data into the application's driver process, and doing
+   * so on a very large dataset can crash the driver process with OutOfMemoryError.
+   *
+   * For Java API, use [[collectAsList]].
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def collect(): Array[T] = withResult { result =>
+    result.toArray.asInstanceOf[Array[T]]
+  }
+
+  /**
+   * Returns a Java list that contains all rows in this Dataset.
+   *
+   * Running collect requires moving all the data into the application's driver process, and doing
+   * so on a very large dataset can crash the driver process with OutOfMemoryError.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def collectAsList(): java.util.List[T] = {
+    java.util.Arrays.asList(collect(): _*)
+  }
+
+  /**
+   * Returns an iterator that contains all rows in this Dataset.
+   *
+   * The returned iterator implements [[AutoCloseable]]. For memory management it is better to
+   * close it once you are done. If you don't close it, it and the underlying data will be cleaned
+   * up once the iterator is garbage collected.
+   *
+   * @group action
+   * @since 3.4.0
+   */
+  def toLocalIterator(): java.util.Iterator[T] = {
+    // TODO make this a destructive iterator.
+    collectResult().iterator.asInstanceOf[java.util.Iterator[T]]
+  }
+
+  private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = {
+    session.newDataset { builder =>
+      builder.getRepartitionBuilder
+        .setInput(plan.getRoot)
+        .setNumPartitions(numPartitions)
+        .setShuffle(shuffle)
+    }
+  }
+
+  private def buildRepartitionByExpression(
+      numPartitions: Option[Int],
+      partitionExprs: Seq[Column]): Dataset[T] = session.newDataset { builder =>
+    val repartitionBuilder = builder.getRepartitionByExpressionBuilder
+      .setInput(plan.getRoot)
+      .addAllPartitionExprs(partitionExprs.map(_.expr).asJava)
+    numPartitions.foreach(repartitionBuilder.setNumPartitions)
+  }
+
+  /**
+   * Returns a new Dataset that has exactly `numPartitions` partitions.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def repartition(numPartitions: Int): Dataset[T] = {
+    buildRepartition(numPartitions, shuffle = true)
+  }
+
+  private def repartitionByExpression(
+      numPartitions: Option[Int],
+      partitionExprs: Seq[Column]): Dataset[T] = {
+    // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments.
+    // However, we don't want to complicate the semantics of this API method.
+    // Instead, let's give users a friendly error message, pointing them to the new method.
+    val sortOrders = partitionExprs.filter(_.expr.hasSortOrder)
+    if (sortOrders.nonEmpty) {
+      throw new IllegalArgumentException(
+        s"Invalid partitionExprs specified: $sortOrders\n" +
+          s"For range partitioning use repartitionByRange(...) instead.")
+    }
+    buildRepartitionByExpression(numPartitions, partitionExprs)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning expressions into `numPartitions`.
+   * The resulting Dataset is hash partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
+    repartitionByExpression(Some(numPartitions), partitionExprs)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning expressions, using
+   * `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is hash
+   * partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group typedrel
+   * @since 2.0.0
+   */
+  @scala.annotation.varargs
+  def repartition(partitionExprs: Column*): Dataset[T] = {
+    repartitionByExpression(None, partitionExprs)
+  }
+
+  private def repartitionByRange(
+      numPartitions: Option[Int],
+      partitionExprs: Seq[Column]): Dataset[T] = {
+    require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.")
+    val sortExprs = partitionExprs.map {
+      case e if e.expr.hasSortOrder => e
+      case e => e.asc
+    }
+    buildRepartitionByExpression(numPartitions, sortExprs)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning expressions into `numPartitions`.
+   * The resulting Dataset is range partitioned.
+   *
+   * At least one partition-by expression must be specified. When no explicit sort order is
+   * specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each
+   * partition of the resulting Dataset.
+   *
+   * Note that due to performance reasons this method uses sampling to estimate the ranges. Hence,
+   * the output may not be consistent, since sampling can return different values. The sample size
+   * can be controlled by the config `spark.sql.execution.rangeExchange.sampleSizePerPartition`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
+    repartitionByRange(Some(numPartitions), partitionExprs)
+  }
+
+  /**
+   * Returns a new Dataset partitioned by the given partitioning expressions, using
+   * `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is range
+   * partitioned.
+   *
+   * At least one partition-by expression must be specified. When no explicit sort order is
+   * specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each
+   * partition of the resulting Dataset.
+   *
+   * Note that due to performance reasons this method uses sampling to estimate the ranges. Hence,
+   * the output may not be consistent, since sampling can return different values. The sample size
+   * can be controlled by the config `spark.sql.execution.rangeExchange.sampleSizePerPartition`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  @scala.annotation.varargs
+  def repartitionByRange(partitionExprs: Column*): Dataset[T] = {
+    repartitionByRange(None, partitionExprs)
+  }
+
+  /**
+   * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
+   * are requested. If a larger number of partitions is requested, it will stay at the current
+   * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in a
+   * narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a
+   * shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
+   *
+   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in
+   * your computation taking place on fewer nodes than you like (e.g. one node in the case of
+   * numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step,
+   * but means the current upstream partitions will be executed in parallel (per whatever the
+   * current partitioning is).
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def coalesce(numPartitions: Int): Dataset[T] = {
+    buildRepartition(numPartitions, shuffle = false)
+  }
+
+  /**
+   * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias
+   * for `dropDuplicates`.
+   *
+   * Note that for a streaming [[Dataset]], this method returns distinct rows only once regardless
+   * of the output mode, which the behavior may not be same with `DISTINCT` in SQL against
+   * streaming [[Dataset]].
+   *
+   * @note
+   *   Equality checking is performed directly on the encoded representation of the data and thus
+   *   is not affected by a custom `equals` function defined on `T`.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def distinct(): Dataset[T] = dropDuplicates()
+
+  /**
+   * Returns a best-effort snapshot of the files that compose this Dataset. This method simply
+   * asks each constituent BaseRelation for its respective files and takes the union of all
+   * results. Depending on the source relations, this may not find all input files. Duplicates are
+   * removed.
+   *
+   * @group basic
+   * @since 3.4.0
+   */
+  def inputFiles: Array[String] = analyze.getInputFilesList.asScala.toArray
+
+  private[sql] def analyze: proto.AnalyzePlanResponse = {
+    session.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
+  }
+
+  def collectResult(): SparkResult = session.execute(plan)
+
+  private def withResult[E](f: SparkResult => E): E = {
+    val result = collectResult()
+    try f(result)
+    finally {
+      result.close()
+    }
+  }
 }
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 55452c6df40..7d6597bf6d5 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -68,7 +68,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
    *
    * @since 3.4.0
    */
-  def range(end: Long): Dataset[java.lang.Long] = range(0, end)
+  def range(end: Long): Dataset[Row] = range(0, end)
 
   /**
    * Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a
@@ -76,7 +76,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
    *
    * @since 3.4.0
    */
-  def range(start: Long, end: Long): Dataset[java.lang.Long] = {
+  def range(start: Long, end: Long): Dataset[Row] = {
     range(start, end, step = 1)
   }
 
@@ -86,7 +86,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
    *
    * @since 3.4.0
    */
-  def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
+  def range(start: Long, end: Long, step: Long): Dataset[Row] = {
     range(start, end, step, None)
   }
 
@@ -96,7 +96,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
    *
    * @since 3.4.0
    */
-  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = {
+  def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Row] = {
     range(start, end, step, Option(numPartitions))
   }
 
@@ -104,7 +104,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
       start: Long,
       end: Long,
       step: Long,
-      numPartitions: Option[Int]): Dataset[java.lang.Long] = {
+      numPartitions: Option[Int]): Dataset[Row] = {
     newDataset { builder =>
       val rangeBuilder = builder.getRangeBuilder
         .setStart(start)
@@ -121,8 +121,10 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
     new Dataset[T](this, plan)
   }
 
-  private[sql] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse =
-    client.analyze(plan)
+  private[sql] def analyze(
+      plan: proto.Plan,
+      mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse =
+    client.analyze(plan, mode)
 
   private[sql] def execute(plan: proto.Plan): SparkResult = {
     val value = client.execute(plan)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 8252f8aef76..3049a0a0a5d 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -70,10 +70,11 @@ class SparkConnectClient(
    * @return
    *   A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
    */
-  def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+  def analyze(plan: proto.Plan, mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse = {
     val request = proto.AnalyzePlanRequest
       .newBuilder()
       .setPlan(plan)
+      .setExplain(proto.Explain.newBuilder().setExplainMode(mode))
       .setUserContext(userContext)
       .setClientId(sessionId)
       .build()
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 6332ed8c734..4b4bedaf659 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -92,6 +92,20 @@ object functions {
     }
   }
 
+  /**
+   * Parses the expression string into the column that it represents, similar to
+   * [[Dataset#selectExpr]].
+   * {{{
+   *   // get the number of words of each length
+   *   df.groupBy(expr("length(word)")).count()
+   * }}}
+   *
+   * @group normal_funcs
+   */
+  def expr(expr: String): Column = Column { builder =>
+    builder.getExpressionStringBuilder.setExpression(expr)
+  }
+
   // scalastyle:off line.size.limit
 
   /**
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index db2b8b26987..a0dd4746a8b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -16,6 +16,13 @@
  */
 package org.apache.spark.sql
 
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalactic.TolerantNumerics
+
 import org.apache.spark.sql.connect.client.util.RemoteSparkSession
 import org.apache.spark.sql.functions.udf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -23,13 +30,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 class ClientE2ETestSuite extends RemoteSparkSession {
 
   // Spark Result
-  test("test spark result schema") {
+  test("spark result schema") {
     val df = spark.sql("select val from (values ('Hello'), ('World')) as t(val)")
     val schema = df.collectResult().schema
     assert(schema == StructType(StructField("val", StringType, false) :: Nil))
   }
 
-  test("test spark result array") {
+  test("spark result array") {
     val df = spark.sql("select val from (values ('Hello'), ('World')) as t(val)")
     val result = df.collectResult()
     assert(result.length == 2)
@@ -39,7 +46,7 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(array(1).getString(0) == "World")
   }
 
-  test("simple dataset test") {
+  test("simple dataset") {
     val df = spark.range(10).limit(3)
     val result = df.collectResult()
     assert(result.length == 3)
@@ -49,7 +56,7 @@ class ClientE2ETestSuite extends RemoteSparkSession {
     assert(array(2).getLong(0) == 2)
   }
 
-  test("simple udf test") {
+  test("simple udf") {
 
     def dummyUdf(x: Int): Int = x + 5
     val myUdf = udf(dummyUdf _)
@@ -64,4 +71,173 @@ class ClientE2ETestSuite extends RemoteSparkSession {
 
   // TODO test large result when we can create table or view
   // test("test spark large result")
+  private def captureStdOut(block: => Unit): String = {
+    val currentOut = Console.out
+    val capturedOut = new ByteArrayOutputStream()
+    val newOut = new PrintStream(new TeeOutputStream(currentOut, capturedOut))
+    Console.withOut(newOut) {
+      block
+    }
+    capturedOut.toString
+  }
+
+  private def checkFragments(result: String, fragmentsToCheck: Seq[String]): Unit = {
+    fragmentsToCheck.foreach { fragment =>
+      assert(result.contains(fragment))
+    }
+  }
+
+  private def testCapturedStdOut(block: => Unit, fragmentsToCheck: String*): Unit = {
+    checkFragments(captureStdOut(block), fragmentsToCheck)
+  }
+
+  private def testCapturedStdOut(
+      block: => Unit,
+      expectedNumLines: Int,
+      expectedMaxWidth: Int,
+      fragmentsToCheck: String*): Unit = {
+    val result = captureStdOut(block)
+    val lines = result.split('\n')
+    assert(lines.length === expectedNumLines)
+    assert(lines.map((s: String) => s.length).max <= expectedMaxWidth)
+    checkFragments(result, fragmentsToCheck)
+  }
+
+  private val simpleSchema = new StructType().add("id", "long", nullable = false)
+
+  // Dataset tests
+  test("Dataset inspection") {
+    val df = spark.range(10)
+    val local = spark.newDataset { builder =>
+      builder.getLocalRelationBuilder.setSchema(simpleSchema.catalogString)
+    }
+    assert(!df.isLocal)
+    assert(local.isLocal)
+    assert(!df.isStreaming)
+    assert(df.toString.contains("[id: bigint]"))
+    assert(df.inputFiles.isEmpty)
+  }
+
+  test("Dataset schema") {
+    val df = spark.range(10)
+    assert(df.schema === simpleSchema)
+    assert(df.dtypes === Array(("id", "LongType")))
+    assert(df.columns === Array("id"))
+    testCapturedStdOut(df.printSchema(), simpleSchema.treeString)
+    testCapturedStdOut(df.printSchema(5), simpleSchema.treeString(5))
+  }
+
+  test("Dataset explain") {
+    val df = spark.range(10)
+    val simpleExplainFragments = Seq("== Physical Plan ==")
+    testCapturedStdOut(df.explain(), simpleExplainFragments: _*)
+    testCapturedStdOut(df.explain(false), simpleExplainFragments: _*)
+    testCapturedStdOut(df.explain("simple"), simpleExplainFragments: _*)
+    val extendedExplainFragments = Seq(
+      "== Parsed Logical Plan ==",
+      "== Analyzed Logical Plan ==",
+      "== Optimized Logical Plan ==") ++
+      simpleExplainFragments
+    testCapturedStdOut(df.explain(true), extendedExplainFragments: _*)
+    testCapturedStdOut(df.explain("extended"), extendedExplainFragments: _*)
+    testCapturedStdOut(
+      df.explain("cost"),
+      simpleExplainFragments :+ "== Optimized Logical Plan ==": _*)
+    testCapturedStdOut(df.explain("codegen"), "WholeStageCodegen subtrees.")
+    testCapturedStdOut(df.explain("formatted"), "Range", "Arguments: ")
+  }
+
+  test("Dataset result collection") {
+    def checkResult(rows: TraversableOnce[Row], expectedValues: Long*): Unit = {
+      rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
+        case (actual, expected) => assert(actual.getLong(0) === expected)
+      }
+    }
+    val df = spark.range(10)
+    checkResult(df.head() :: Nil, 0L)
+    checkResult(df.head(5), 0L, 1L, 2L, 3L, 4L)
+    checkResult(df.first() :: Nil, 0L)
+    assert(!df.isEmpty)
+    assert(df.filter("id > 100").isEmpty)
+    checkResult(df.take(3), 0L, 1L, 2L)
+    checkResult(df.tail(3), 7L, 8L, 9L)
+    checkResult(df.takeAsList(10).asScala, 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)
+    checkResult(df.filter("id % 3 = 0").collect(), 0L, 3L, 6L, 9L)
+    checkResult(df.filter("id < 3").collectAsList().asScala, 0L, 1L, 2L)
+    val iterator = df.filter("id > 5 and id < 9").toLocalIterator()
+    try {
+      checkResult(iterator.asScala, 6L, 7L, 8L)
+    } finally {
+      iterator.asInstanceOf[AutoCloseable].close()
+    }
+  }
+
+  test("Dataset show") {
+    val df = spark.range(20)
+    testCapturedStdOut(df.show(), 24, 5, "+---+", "| id|", "|  0|", "| 19|")
+    testCapturedStdOut(
+      df.show(10),
+      15,
+      24,
+      "+---+",
+      "| id|",
+      "|  0|",
+      "|  9|",
+      "only showing top 10 rows")
+    val wideDf =
+      spark.range(4).selectExpr("id", "concat('very_very_very_long_string', id) as val")
+    testCapturedStdOut(
+      wideDf.show(true),
+      8,
+      26,
+      "+---+--------------------+",
+      "| id|                 val|",
+      "|  0|very_very_very_lo...|")
+    testCapturedStdOut(
+      wideDf.show(false),
+      8,
+      33,
+      "+---+---------------------------+",
+      "|id |val                        |",
+      "|2  |very_very_very_long_string2|")
+    testCapturedStdOut(
+      wideDf.show(2, truncate = false),
+      7,
+      33,
+      "+---+---------------------------+",
+      "|id |val                        |",
+      "|1  |very_very_very_long_string1|",
+      "only showing top 2 rows")
+    testCapturedStdOut(
+      df.show(8, 10, vertical = true),
+      17,
+      23,
+      "-RECORD 3--",
+      "id  | 7",
+      "only showing top 8 rows")
+  }
+
+  test("Dataset randomSplit") {
+    implicit val tolerance = TolerantNumerics.tolerantDoubleEquality(0.01)
+
+    val df = spark.range(100)
+    def checkSample(ds: DataFrame, lower: Double, upper: Double, seed: Long): Unit = {
+      assert(ds.plan.getRoot.hasSample)
+      val sample = ds.plan.getRoot.getSample
+      assert(sample.getSeed === seed)
+      assert(sample.getLowerBound === lower)
+      assert(sample.getUpperBound === upper)
+    }
+    val Array(ds1, ds2, ds3) = df.randomSplit(Array(8, 9, 7), 123L)
+    checkSample(ds1, 0, 8.0 / 24.0, 123L)
+    checkSample(ds2, 8.0 / 24.0, 17.0 / 24.0, 123L)
+    checkSample(ds3, 17.0 / 24.0, 1.0, 123L)
+
+    val datasets = df.randomSplitAsList(Array(1, 2, 3, 4), 9L)
+    assert(datasets.size() === 4)
+    checkSample(datasets.get(0), 0, 1.0 / 10.0, 9L)
+    checkSample(datasets.get(1), 1.0 / 10.0, 3.0 / 10.0, 9L)
+    checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
+    checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
+  }
 }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5e7cc9f4b6d..087dcbb360a 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -27,6 +27,10 @@ import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
 
+// Add sample tests.
+// - sample fraction: simple.sample(0.1)
+// - sample withReplacement_fraction: simple.sample(withReplacement = true, 0.11)
+// Add tests for exceptions thrown
 class DatasetSuite
     extends AnyFunSuite // scalastyle:ignore funsuite
     with BeforeAndAfterEach {
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 494c497c553..28ac12928fd 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.connect.proto
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{functions => fn}
 import org.apache.spark.sql.connect.client.SparkConnectClient
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
 
 // scalastyle:off
 /**
@@ -153,18 +153,25 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
     }
   }
 
+  private def createLocalRelation(schema: StructType): DataFrame = session.newDataset { builder =>
+    // TODO API is not consistent. Now we have two different ways of working with schemas!
+    builder.getLocalRelationBuilder.setSchema(schema.catalogString)
+  }
+
   private val simpleSchema = new StructType()
     .add("id", "long")
     .add("a", "int")
     .add("b", "double")
 
-  private val simpleSchemaString = simpleSchema.catalogString
+  private val otherSchema = new StructType()
+    .add("a", "int")
+    .add("id", "long")
+    .add("payload", "binary")
 
-  // We manually construct a simple empty data frame.
-  private def simple = session.newDataset { builder =>
-    // TODO API is not consistent. Now we have two different ways of working with schemas!
-    builder.getLocalRelationBuilder.setSchema(simpleSchemaString)
-  }
+  // A few helper dataframes.
+  private def simple: DataFrame = createLocalRelation(simpleSchema)
+  private def left: DataFrame = simple
+  private def right: DataFrame = createLocalRelation(otherSchema)
 
   private def select(cs: Column*): DataFrame = simple.select(cs: _*)
 
@@ -190,6 +197,310 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
     simple.filter(fn.col("id") === fn.lit(10L))
   }
 
+  test("toDF") {
+    simple.toDF("x1", "x2", "x3")
+  }
+
+  test("to") {
+    simple.to(
+      new StructType()
+        .add("b", "double")
+        .add("id", "int"))
+  }
+
+  test("join inner_no_condition") {
+    left.join(right)
+  }
+
+  test("join inner_using_single_col") {
+    left.join(right, "id")
+  }
+
+  test("join inner_using_multiple_col_array") {
+    left.join(right, Array("id", "a"))
+  }
+
+  test("join inner_using_multiple_col_seq") {
+    left.join(right, Seq("id", "a"))
+  }
+
+  test("join using_single_col") {
+    left.join(right, "id", "left_semi")
+  }
+
+  test("join using_multiple_col_array") {
+    left.join(right, Array("id", "a"), "full_outer")
+  }
+
+  test("join using_multiple_col_seq") {
+    left.join(right, Seq("id", "a"), "right_outer")
+  }
+
+  test("join inner_condition") {
+    left.join(right, fn.col("a") === fn.col("a"))
+  }
+
+  test("join condition") {
+    left.join(right, fn.col("id") === fn.col("id"), "left_anti")
+  }
+
+  test("crossJoin") {
+    left.crossJoin(right)
+  }
+
+  test("sortWithinPartitions strings") {
+    simple.sortWithinPartitions("a", "id")
+  }
+
+  test("sortWithinPartitions columns") {
+    simple.sortWithinPartitions(fn.col("id"), fn.col("b"))
+  }
+
+  test("sort strings") {
+    simple.sort("b", "a")
+  }
+
+  test("sort columns") {
+    simple.sort(fn.col("id"), fn.col("b"))
+  }
+
+  test("orderBy strings") {
+    simple.sort("b", "id", "a")
+  }
+
+  test("orderBy columns") {
+    simple.sort(fn.col("id"), fn.col("b"), fn.col("a"))
+  }
+
+  test("apply") {
+    simple.select(simple.apply("a"))
+  }
+
+  test("hint") {
+    simple.hint("coalesce", 100)
+  }
+
+  test("col") {
+    simple.select(simple.col("id"), simple.col("b"))
+  }
+
+  test("colRegex") {
+    simple.select(simple.colRegex("a|id"))
+  }
+
+  test("as string") {
+    simple.as("foo")
+  }
+
+  test("as symbol") {
+    simple.as('bar)
+  }
+  test("alias string") {
+    simple.alias("fooz")
+  }
+
+  test("alias symbol") {
+    simple.alias("bob")
+  }
+
+  test("select strings") {
+    simple.select("id", "a")
+  }
+
+  test("selectExpr") {
+    simple.selectExpr("a + 10 as x", "id % 10 as grp")
+  }
+
+  test("filter expr") {
+    simple.filter("exp(a) < 10.0")
+  }
+
+  test("where column") {
+    simple.where(fn.col("id") === fn.lit(1L))
+  }
+
+  test("where expr") {
+    simple.where("a + id < 1000")
+  }
+
+  test("unpivot values") {
+    simple.unpivot(
+      ids = Array(fn.col("id"), fn.col("a")),
+      values = Array(fn.col("b")),
+      variableColumnName = "name",
+      valueColumnName = "value")
+  }
+
+  test("unpivot no_values") {
+    simple.unpivot(
+      ids = Array(fn.col("id")),
+      variableColumnName = "name",
+      valueColumnName = "value")
+  }
+
+  test("melt values") {
+    simple.unpivot(
+      ids = Array(fn.col("a")),
+      values = Array(fn.col("id")),
+      variableColumnName = "name",
+      valueColumnName = "value")
+  }
+
+  test("melt no_values") {
+    simple.melt(
+      ids = Array(fn.col("id"), fn.col("a")),
+      variableColumnName = "name",
+      valueColumnName = "value")
+  }
+
+  test("offset") {
+    simple.offset(1000)
+  }
+
+  test("union") {
+    simple.union(simple)
+  }
+
+  test("unionAll") {
+    simple.union(simple)
+  }
+
+  test("unionByName") {
+    simple.unionByName(right)
+  }
+
+  test("unionByName allowMissingColumns") {
+    simple.unionByName(right, allowMissingColumns = true)
+  }
+
+  test("intersect") {
+    simple.intersect(simple)
+  }
+
+  test("intersectAll") {
+    simple.intersectAll(simple)
+  }
+
+  test("except") {
+    simple.except(simple)
+  }
+
+  test("exceptAll") {
+    simple.exceptAll(simple)
+  }
+
+  test("sample fraction_seed") {
+    simple.sample(0.43, 9890823L)
+  }
+
+  test("sample withReplacement_fraction_seed") {
+    simple.sample(withReplacement = true, 0.23, 898L)
+  }
+
+  test("withColumn single") {
+    simple.withColumn("z", fn.expr("a + 100"))
+  }
+
+  test("withColumns scala_map") {
+    simple.withColumns(Map(("b", fn.lit("redacted")), ("z", fn.expr("a + 100"))))
+  }
+
+  test("withColumns java_map") {
+    val map = new java.util.HashMap[String, Column]
+    map.put("g", fn.col("id"))
+    map.put("a", fn.lit("123"))
+    simple.withColumns(map)
+  }
+
+  test("withColumnRenamed single") {
+    simple.withColumnRenamed("id", "nid")
+  }
+
+  test("withColumnRenamed scala_map") {
+    simple.withColumnsRenamed(Map(("a", "alpha"), ("b", "beta")))
+  }
+
+  test("withColumnRenamed java_map") {
+    val map = new java.util.HashMap[String, String]
+    map.put("id", "nid")
+    map.put("b", "bravo")
+    simple.withColumnsRenamed(map)
+  }
+
+  test("withMetadata") {
+    val builder = new MetadataBuilder
+    builder.putString("description", "unique identifier")
+    simple.withMetadata("id", builder.build())
+  }
+
+  test("drop single string") {
+    simple.drop("a")
+  }
+
+  test("drop multiple strings") {
+    simple.drop("id", "a", "b")
+  }
+
+  test("drop single column") {
+    simple.drop(fn.col("b"))
+  }
+
+  test("drop multiple column") {
+    simple.drop(fn.col("b"), fn.col("id"))
+  }
+
+  test("dropDuplicates") {
+    simple.dropDuplicates()
+  }
+
+  test("dropDuplicates names seq") {
+    simple.dropDuplicates("a" :: "b" :: Nil)
+  }
+
+  test("dropDuplicates names array") {
+    simple.dropDuplicates(Array("a", "id"))
+  }
+
+  test("dropDuplicates varargs") {
+    simple.dropDuplicates("a", "b", "id")
+  }
+
+  test("describe") {
+    simple.describe("id", "b")
+  }
+
+  test("summary") {
+    simple.summary("mean", "min")
+  }
+
+  test("repartition") {
+    simple.repartition(24)
+  }
+
+  test("repartition num_partitions_expressions") {
+    simple.repartition(22, fn.col("a"), fn.col("id"))
+  }
+
+  test("repartition expressions") {
+    simple.repartition(fn.col("id"), fn.col("b"))
+  }
+
+  test("repartitionByRange num_partitions_expressions") {
+    simple.repartitionByRange(33, fn.col("b"), fn.col("id").desc_nulls_first)
+  }
+
+  test("repartitionByRange expressions") {
+    simple.repartitionByRange(fn.col("a").asc, fn.col("id").desc_nulls_first)
+  }
+
+  test("coalesce") {
+    simple.coalesce(5)
+  }
+
+  test("distinct") {
+    simple.distinct()
+  }
+
   /* Column API */
   test("column by name") {
     select(fn.col("b"))
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
index 760609a703f..8410073d6ec 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
@@ -65,7 +65,7 @@ class SparkSessionSuite
           .build())
       .build()
     val plan = proto.Plan.newBuilder().build()
-    ss.analyze(plan)
+    ss.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
     assert(plan.equals(service.getAndClearLatestInputPlan()))
   }
 
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 21eed56ee78..867995efc2e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -83,7 +83,9 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
       // Skip all shaded dependencies in the client.
       ProblemFilters.exclude[Problem]("org.sparkproject.*"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"))
+      ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
+      // Disable Range until we support typed APIs
+      ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
     val problems = allProblems
       .filter { p =>
         includedRules.exists(rule => rule(p))
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
similarity index 89%
rename from connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
index 5a70fb2741a..c30ea8c8301 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
@@ -15,17 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connect.planner
+package org.apache.spark.sql.connect.common
 
 import scala.collection.convert.ImplicitConversions._
 
 import org.apache.spark.connect.proto
-import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 /**
- * This object offers methods to convert to/from connect proto to catalyst types.
+ * Helper class for conversions between [[DataType]] and [[proto.DataType]].
  */
 object DataTypeProtoConverter {
   def toCatalystType(t: proto.DataType): DataType = {
@@ -360,28 +359,4 @@ object DataTypeProtoConverter {
         throw InvalidPlanInput(s"Does not support convert ${t.typeName} to connect proto types.")
     }
   }
-
-  def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode = {
-    mode match {
-      case proto.WriteOperation.SaveMode.SAVE_MODE_APPEND => SaveMode.Append
-      case proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE => SaveMode.Ignore
-      case proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE => SaveMode.Overwrite
-      case proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS => SaveMode.ErrorIfExists
-      case _ =>
-        throw new IllegalArgumentException(
-          s"Cannot convert from WriteOperaton.SaveMode to Spark SaveMode: ${mode.getNumber}")
-    }
-  }
-
-  def toSaveModeProto(mode: SaveMode): proto.WriteOperation.SaveMode = {
-    mode match {
-      case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
-      case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
-      case SaveMode.Overwrite => proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
-      case SaveMode.ErrorIfExists => proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
-      case _ =>
-        throw new IllegalArgumentException(
-          s"Cannot convert from SaveMode to WriteOperation.SaveMode: ${mode.name()}")
-    }
-  }
 }
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala
new file mode 100644
index 00000000000..0caa4122f09
--- /dev/null
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+/**
+ * Error thrown when a connect plan is not valid.
+ */
+final case class InvalidPlanInput(
+    private val message: String = "",
+    private val cause: Throwable = None.orNull)
+    extends Exception(message, cause)
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain
new file mode 100644
index 00000000000..b0d2c358215
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain
@@ -0,0 +1,2 @@
+'Project ['a]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain
new file mode 100644
index 00000000000..294fa0f73c3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain
@@ -0,0 +1,2 @@
+Repartition 5, false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain
new file mode 100644
index 00000000000..0a5d0a342db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain
@@ -0,0 +1,2 @@
+'Project ['id, 'b]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain
new file mode 100644
index 00000000000..543b1faaa99
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain
@@ -0,0 +1,2 @@
+'Project ['a|id]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain
new file mode 100644
index 00000000000..05c0d319db4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain
@@ -0,0 +1,3 @@
+'Join Cross
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
new file mode 100644
index 00000000000..3f88f836b4b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
@@ -0,0 +1,6 @@
+Project [none#2, element_at(none#0, none#2, None, false) AS #0, element_at(none#1, none#2, None, false) AS #1]
++- !Project [none#0, none#1, none#2]
+   +- Generate explode([count,mean,stddev,min,max]), false, [none#0]
+      +- !Aggregate [map(cast(count as string), cast(count(none#0L) as string), cast(mean as string), cast(avg(none#0L) as string), cast(stddev as string), cast(stddev_samp(cast(none#0L as double)) as string), cast(min as string), cast(min(none#0L) as string), cast(max as string), cast(max(none#0L) as string)) AS #0, map(cast(count as string), cast(count(none#1) as string), cast(mean as string), cast(avg(none#1) as string), cast(stddev as string), cast(stddev_samp(none#1) as string), cas [...]
+         +- Project [none#0L, none#2]
+            +- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain
new file mode 100644
index 00000000000..e4da86166dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain
new file mode 100644
index 00000000000..e4da86166dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain
new file mode 100644
index 00000000000..9f8c1bb8d91
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#0L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain
new file mode 100644
index 00000000000..cd7bbac89df
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain
new file mode 100644
index 00000000000..b034ead771d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#2, none#0L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain
new file mode 100644
index 00000000000..d50b1411784
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain
@@ -0,0 +1,2 @@
+Project [none#1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain
new file mode 100644
index 00000000000..75d78bac2d9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain
@@ -0,0 +1,2 @@
+Project
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain
new file mode 100644
index 00000000000..d63618297e5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain
new file mode 100644
index 00000000000..7eaf23106a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain
new file mode 100644
index 00000000000..eb3c62c6f51
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain
@@ -0,0 +1,3 @@
+'Except false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain
new file mode 100644
index 00000000000..95882222ba0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain
@@ -0,0 +1,3 @@
+'Except All true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain
new file mode 100644
index 00000000000..4a4d83cbbe3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain
@@ -0,0 +1,2 @@
+'Filter (10.0 > 'exp('a))
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain
new file mode 100644
index 00000000000..cdcf089bf7a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain
@@ -0,0 +1,2 @@
+'Project [unresolvedalias((), None), unresolvedalias(foo('id), None), unresolvedalias(f3('id, 'id), None), unresolvedalias(bar('id, 'id, 'id), None), unresolvedalias(f_four('id, 'id, 'id, 'id), None)]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain
new file mode 100644
index 00000000000..71ec808a221
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain
@@ -0,0 +1,2 @@
+UnresolvedHint coalesce, [100]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain
new file mode 100644
index 00000000000..89b24beb6fe
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain
@@ -0,0 +1,3 @@
+'Intersect false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain
new file mode 100644
index 00000000000..5159e4f5d1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain
@@ -0,0 +1,3 @@
+'Intersect All true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain
new file mode 100644
index 00000000000..a0f818eb7db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain
@@ -0,0 +1,3 @@
+'Join LeftAnti, '`=`('id, 'id)
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain
new file mode 100644
index 00000000000..f0bd1db03ac
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain
@@ -0,0 +1,3 @@
+'Join Inner, '`=`('a, 'a)
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain
new file mode 100644
index 00000000000..df290259b0a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain
@@ -0,0 +1,3 @@
+'Join Inner
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain
new file mode 100644
index 00000000000..715fa26ba2e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain
new file mode 100644
index 00000000000..715fa26ba2e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain
new file mode 100644
index 00000000000..bfcdcffbff9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain
new file mode 100644
index 00000000000..92056649124
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(FullOuter,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain
new file mode 100644
index 00000000000..577f544646e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(RightOuter,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain
new file mode 100644
index 00000000000..224790686d5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(LeftSemi,Buffer(id))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain
new file mode 100644
index 00000000000..36566b68c95
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id, 'a), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain
new file mode 100644
index 00000000000..e11778c63bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('a), ArraySeq(List('id)), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain
new file mode 100644
index 00000000000..2da57931059
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain
@@ -0,0 +1,2 @@
+Offset 1000
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain
new file mode 100644
index 00000000000..f183bc7be36
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain
new file mode 100644
index 00000000000..c976a65aab5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['b ASC NULLS FIRST, 'id ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain
new file mode 100644
index 00000000000..dd6c0fe4062
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain
@@ -0,0 +1,2 @@
+Repartition 24, true
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain
new file mode 100644
index 00000000000..2f169429f81
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['a ASC NULLS FIRST, 'id DESC NULLS FIRST]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain
new file mode 100644
index 00000000000..054f8ad05bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['b ASC NULLS FIRST, 'id DESC NULLS FIRST], 33
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain
new file mode 100644
index 00000000000..e6f09597d7d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['id, 'b]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain
new file mode 100644
index 00000000000..dbf1e52e2d3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['a, 'id], 22
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain
new file mode 100644
index 00000000000..9981205d7af
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain
@@ -0,0 +1,2 @@
+Sample 0.0, 0.43, false, 9890823
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain
new file mode 100644
index 00000000000..4e2d884bb37
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain
@@ -0,0 +1,2 @@
+Sample 0.0, 0.23, true, 898
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain
new file mode 100644
index 00000000000..53eaa9352a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain
@@ -0,0 +1,2 @@
+'Project [('a + 10) AS #0, ('id % 10) AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain
new file mode 100644
index 00000000000..61526cc6042
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain
@@ -0,0 +1,2 @@
+'Project ['id, 'a]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain
new file mode 100644
index 00000000000..79ae0d702aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain
new file mode 100644
index 00000000000..9d49e22afe4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['a ASC NULLS FIRST, 'id ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain
new file mode 100644
index 00000000000..79ae0d702aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain
new file mode 100644
index 00000000000..7bc8b68fe37
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['b ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain
new file mode 100644
index 00000000000..98f69703a4c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain
@@ -0,0 +1,5 @@
+Project [none#3, element_at(none#0, none#3, None, false) AS #0, element_at(none#1, none#3, None, false) AS #1, element_at(none#2, none#3, None, false) AS #2]
++- !Project [none#0, none#1, none#2, none#3]
+   +- Generate explode([mean,min]), false, [none#0]
+      +- Aggregate [map(cast(mean as string), cast(avg(none#0L) as string), cast(min as string), cast(min(none#0L) as string)) AS #0, map(cast(mean as string), cast(avg(none#1) as string), cast(min as string), cast(min(none#1) as string)) AS #1, map(cast(mean as string), cast(avg(none#2) as string), cast(min as string), cast(min(none#2) as string)) AS #2]
+         +- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain
new file mode 100644
index 00000000000..cc952f01a83
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain
@@ -0,0 +1,2 @@
+Project [none#2, cast(none#0L as int) AS #0]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain
new file mode 100644
index 00000000000..aa71dcd5231
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1 AS #1, none#2 AS #2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain
new file mode 100644
index 00000000000..8d2530c4959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain
@@ -0,0 +1,3 @@
+Union false, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain
new file mode 100644
index 00000000000..8d2530c4959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain
@@ -0,0 +1,3 @@
+Union false, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain
new file mode 100644
index 00000000000..104217a175d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain
@@ -0,0 +1,3 @@
+'Union true, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain
new file mode 100644
index 00000000000..70d1955f58d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain
@@ -0,0 +1,3 @@
+'Union true, true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain
new file mode 100644
index 00000000000..3d3331f31f9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain
new file mode 100644
index 00000000000..a7b5b530463
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id, 'a), ArraySeq(List('b)), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain
new file mode 100644
index 00000000000..9580f08b5e6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain
@@ -0,0 +1,2 @@
+'Filter '`=`('id, 1)
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain
new file mode 100644
index 00000000000..0c3be1cbef3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain
@@ -0,0 +1,2 @@
+'Filter (1000 > ('a + 'id))
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain
new file mode 100644
index 00000000000..2fed4e66ac5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2 AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain
new file mode 100644
index 00000000000..9a22c8ad5cb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1 AS #0, none#2 AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain
new file mode 100644
index 00000000000..07dec2eba0f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain
new file mode 100644
index 00000000000..fe8ec9459bc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1, none#2, (100 + none#1) AS #0]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain
new file mode 100644
index 00000000000..6702dc2f18e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, 123 AS #0, none#2, none#0L AS #1L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain
new file mode 100644
index 00000000000..97066dbe7df
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1, redacted AS #0, (100 + none#1) AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain
new file mode 100644
index 00000000000..07dec2eba0f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json
new file mode 100644
index 00000000000..1209a4dfb87
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json
@@ -0,0 +1,10 @@
+{
+  "subqueryAlias": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "alias": "fooz"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin
new file mode 100644
index 00000000000..1f969506e38
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin
@@ -0,0 +1,2 @@
+�,
+$Z" struct<id:bigint,a:int,b:double>fooz
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json
new file mode 100644
index 00000000000..ba861c077fb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json
@@ -0,0 +1,10 @@
+{
+  "subqueryAlias": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "alias": "bob"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin
new file mode 100644
index 00000000000..788038972d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>bob
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/apply.json b/connector/connect/common/src/test/resources/query-tests/queries/apply.json
new file mode 100644
index 00000000000..d53d24efef8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/apply.json
@@ -0,0 +1,14 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin
new file mode 100644
index 00000000000..80cb4ad244e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin
@@ -0,0 +1,3 @@
+-
+$Z" struct<id:bigint,a:int,b:double>
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_string.json b/connector/connect/common/src/test/resources/query-tests/queries/as_string.json
new file mode 100644
index 00000000000..501189fbe13
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_string.json
@@ -0,0 +1,10 @@
+{
+  "subqueryAlias": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "alias": "foo"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin
new file mode 100644
index 00000000000..f433909e126
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>foo
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json
new file mode 100644
index 00000000000..e2a7d2bb193
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json
@@ -0,0 +1,10 @@
+{
+  "subqueryAlias": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "alias": "bar"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin
new file mode 100644
index 00000000000..c9ad3f8e0c5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>bar
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json
new file mode 100644
index 00000000000..34a329f893a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json
@@ -0,0 +1,11 @@
+{
+  "repartition": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "numPartitions": 5,
+    "shuffle": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin
new file mode 100644
index 00000000000..ed1a6ce29fa
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/col.json b/connector/connect/common/src/test/resources/query-tests/queries/col.json
new file mode 100644
index 00000000000..aa2c09ce0d7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/col.json
@@ -0,0 +1,18 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "b"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin
new file mode 100644
index 00000000000..6db0c6bb0b0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin
@@ -0,0 +1,4 @@
+5
+$Z" struct<id:bigint,a:int,b:double>
+id
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json
new file mode 100644
index 00000000000..bb4c89e4516
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json
@@ -0,0 +1,14 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedRegex": {
+        "colName": "a|id"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin
new file mode 100644
index 00000000000..1a3b97ea0f5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin
@@ -0,0 +1,3 @@
+0
+$Z" struct<id:bigint,a:int,b:double>B
+a|id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json
new file mode 100644
index 00000000000..bac0eebf8ee
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json
@@ -0,0 +1,15 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_CROSS"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin
new file mode 100644
index 00000000000..b86654bb57f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin
@@ -0,0 +1,2 @@
+*T
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/describe.json b/connector/connect/common/src/test/resources/query-tests/queries/describe.json
new file mode 100644
index 00000000000..c3e9ded3133
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/describe.json
@@ -0,0 +1,10 @@
+{
+  "describe": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": ["id", "b"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin
new file mode 100644
index 00000000000..a39ee7fc80c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin
@@ -0,0 +1,2 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>idb
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
new file mode 100644
index 00000000000..094caa14208
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
@@ -0,0 +1,10 @@
+{
+  "deduplicate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "allColumnsAsKeys": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
new file mode 100644
index 00000000000..4892a7c7162
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
@@ -0,0 +1,2 @@
+r(
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
new file mode 100644
index 00000000000..094caa14208
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
@@ -0,0 +1,10 @@
+{
+  "deduplicate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "allColumnsAsKeys": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
new file mode 100644
index 00000000000..4892a7c7162
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
@@ -0,0 +1,2 @@
+r(
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
new file mode 100644
index 00000000000..e55248a383e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
@@ -0,0 +1,10 @@
+{
+  "deduplicate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "columnNames": ["a", "id"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
new file mode 100644
index 00000000000..51d419de570
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
@@ -0,0 +1,2 @@
+r-
+$Z" struct<id:bigint,a:int,b:double>aid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
new file mode 100644
index 00000000000..d2a9e041418
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
@@ -0,0 +1,10 @@
+{
+  "deduplicate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "columnNames": ["a", "b"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
new file mode 100644
index 00000000000..57c97010bdd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
@@ -0,0 +1,2 @@
+r,
+$Z" struct<id:bigint,a:int,b:double>ab
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
new file mode 100644
index 00000000000..7b2468c70a6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
@@ -0,0 +1,10 @@
+{
+  "deduplicate": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "columnNames": ["a", "b", "id"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
new file mode 100644
index 00000000000..496dc9e4516
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
@@ -0,0 +1,2 @@
+r0
+$Z" struct<id:bigint,a:int,b:double>abid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json
new file mode 100644
index 00000000000..275f15a04e1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json
@@ -0,0 +1,18 @@
+{
+  "drop": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "b"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin
new file mode 100644
index 00000000000..65d2cac04cc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin
@@ -0,0 +1,4 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+b
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json
new file mode 100644
index 00000000000..96d50cef533
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json
@@ -0,0 +1,22 @@
+{
+  "drop": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "b"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin
new file mode 100644
index 00000000000..29ae3de0fcc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin
@@ -0,0 +1,5 @@
+�<
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json
new file mode 100644
index 00000000000..454bb1744b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json
@@ -0,0 +1,14 @@
+{
+  "drop": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "b"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin
new file mode 100644
index 00000000000..3cf2a6b652a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin
@@ -0,0 +1,3 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json
new file mode 100644
index 00000000000..812fc60d9d2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json
@@ -0,0 +1,14 @@
+{
+  "drop": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "cols": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin
new file mode 100644
index 00000000000..a4146717a0b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin
@@ -0,0 +1,3 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/except.json b/connector/connect/common/src/test/resources/query-tests/queries/except.json
new file mode 100644
index 00000000000..d88134dc268
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/except.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_EXCEPT",
+    "isAll": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin
new file mode 100644
index 00000000000..64fcb2a45c9
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json
new file mode 100644
index 00000000000..dfb04bf53a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_EXCEPT",
+    "isAll": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin
new file mode 100644
index 00000000000..cc746e54e00
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json
new file mode 100644
index 00000000000..77635b51172
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json
@@ -0,0 +1,14 @@
+{
+  "filter": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "condition": {
+      "expressionString": {
+        "expression": "exp(a) \u003c 10.0"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin
new file mode 100644
index 00000000000..e7ddb9c54fa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin
@@ -0,0 +1,3 @@
+"9
+$Z" struct<id:bigint,a:int,b:double>"
+
exp(a) < 10.0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json
new file mode 100644
index 00000000000..76738354e15
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json
@@ -0,0 +1,96 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "commonInlineUserDefinedFunction": {
+        "scalarScalaUdf": {
+          "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQxBwIoVHJpGRUCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+        }
+      }
+    }, {
+      "commonInlineUserDefinedFunction": {
+        "functionName": "foo",
+        "deterministic": true,
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }],
+        "scalarScalaUdf": {
+          "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQyWQlE7Ce2cPkCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+          "nullable": true
+        }
+      }
+    }, {
+      "commonInlineUserDefinedFunction": {
+        "functionName": "f3",
+        "deterministic": true,
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }],
+        "scalarScalaUdf": {
+          "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQzyD4NN4Grh74CAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+          "nullable": true
+        }
+      }
+    }, {
+      "commonInlineUserDefinedFunction": {
+        "functionName": "bar",
+        "deterministic": true,
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }],
+        "scalarScalaUdf": {
+          "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQ009cpyjjQtFMCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+          "nullable": true
+        }
+      }
+    }, {
+      "commonInlineUserDefinedFunction": {
+        "functionName": "f_four",
+        "deterministic": true,
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }],
+        "scalarScalaUdf": {
+          "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQ1hQsS9jxAO/gCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+          "nullable": true
+        }
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin
new file mode 100644
index 00000000000..aa1971d3112
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/hint.json b/connector/connect/common/src/test/resources/query-tests/queries/hint.json
new file mode 100644
index 00000000000..38f3ff1ab7a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/hint.json
@@ -0,0 +1,15 @@
+{
+  "hint": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "name": "coalesce",
+    "parameters": [{
+      "literal": {
+        "integer": 100
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin
new file mode 100644
index 00000000000..8832794f792
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin
@@ -0,0 +1,3 @@
+�6
+$Z" struct<id:bigint,a:int,b:double>coalesce
+0d
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersect.json b/connector/connect/common/src/test/resources/query-tests/queries/intersect.json
new file mode 100644
index 00000000000..cd8167116c0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersect.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_INTERSECT",
+    "isAll": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin
new file mode 100644
index 00000000000..71816354cda
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json
new file mode 100644
index 00000000000..9fd2a7a3727
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_INTERSECT",
+    "isAll": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin
new file mode 100644
index 00000000000..5e3325c73c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json
new file mode 100644
index 00000000000..b152642b547
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json
@@ -0,0 +1,29 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinCondition": {
+      "unresolvedFunction": {
+        "functionName": "\u003d",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }]
+      }
+    },
+    "joinType": "JOIN_TYPE_LEFT_ANTI"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin
new file mode 100644
index 00000000000..a1464a72ea8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin
@@ -0,0 +1,5 @@
+*k
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
+=
+id
+id 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json
new file mode 100644
index 00000000000..28949407af4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json
@@ -0,0 +1,29 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinCondition": {
+      "unresolvedFunction": {
+        "functionName": "\u003d",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin
new file mode 100644
index 00000000000..9d0a3ca5d08
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin
@@ -0,0 +1,5 @@
+*i
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
+=
+a
+a 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json
new file mode 100644
index 00000000000..0308a128db3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json
@@ -0,0 +1,15 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin
new file mode 100644
index 00000000000..9a269059bf7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin
@@ -0,0 +1,2 @@
+*T
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json
new file mode 100644
index 00000000000..9f9f1a0cf30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER",
+    "usingColumns": ["id", "a"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin
new file mode 100644
index 00000000000..0e4192d7afc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json
new file mode 100644
index 00000000000..9f9f1a0cf30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER",
+    "usingColumns": ["id", "a"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin
new file mode 100644
index 00000000000..0e4192d7afc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json
new file mode 100644
index 00000000000..b5137f978a9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_INNER",
+    "usingColumns": ["id"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin
new file mode 100644
index 00000000000..0708749bd47
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin
@@ -0,0 +1,2 @@
+*X
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json
new file mode 100644
index 00000000000..4bd0b9ec2c7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_FULL_OUTER",
+    "usingColumns": ["id", "a"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin
new file mode 100644
index 00000000000..954128f8c1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json
new file mode 100644
index 00000000000..3d06f7ab31f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_RIGHT_OUTER",
+    "usingColumns": ["id", "a"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin
new file mode 100644
index 00000000000..5878e776d87
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json
new file mode 100644
index 00000000000..b5b8b3a40f9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json
@@ -0,0 +1,16 @@
+{
+  "join": {
+    "left": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "right": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "joinType": "JOIN_TYPE_LEFT_SEMI",
+    "usingColumns": ["id"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin
new file mode 100644
index 00000000000..0c72726a8c2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin
@@ -0,0 +1,2 @@
+*X
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json
new file mode 100644
index 00000000000..292adb11b17
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json
@@ -0,0 +1,19 @@
+{
+  "unpivot": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "ids": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }],
+    "valueColumnName": "value"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin
new file mode 100644
index 00000000000..dc1a40f8d48
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin
@@ -0,0 +1,4 @@
+�<
+$Z" struct<id:bigint,a:int,b:double>
+id
+a*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json
new file mode 100644
index 00000000000..79d494c3c5d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json
@@ -0,0 +1,22 @@
+{
+  "unpivot": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "ids": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }],
+    "values": {
+      "values": [{
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      }]
+    },
+    "valueColumnName": "value"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin
new file mode 100644
index 00000000000..1ac7cb290f8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin
@@ -0,0 +1,5 @@
+�>
+$Z" struct<id:bigint,a:int,b:double>
+a
+
+id*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/offset.json b/connector/connect/common/src/test/resources/query-tests/queries/offset.json
new file mode 100644
index 00000000000..e03d5072ea4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/offset.json
@@ -0,0 +1,10 @@
+{
+  "offset": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "offset": 1000
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin
new file mode 100644
index 00000000000..8c7dde8f92c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin
@@ -0,0 +1,2 @@
+j)
+$Z" struct<id:bigint,a:int,b:double>�
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json
new file mode 100644
index 00000000000..84cb594f6a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json
@@ -0,0 +1,35 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "a"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin
new file mode 100644
index 00000000000..4061563400c
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json
new file mode 100644
index 00000000000..41bf89273e2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json
@@ -0,0 +1,35 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "a"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin
new file mode 100644
index 00000000000..e2f5ccbdd9a
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition.json
new file mode 100644
index 00000000000..770b227b707
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition.json
@@ -0,0 +1,11 @@
+{
+  "repartition": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "numPartitions": 24,
+    "shuffle": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin
new file mode 100644
index 00000000000..03bb4787617
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin
@@ -0,0 +1,2 @@
+�*
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json
new file mode 100644
index 00000000000..deb4c618885
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json
@@ -0,0 +1,30 @@
+{
+  "repartitionByExpression": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "partitionExprs": [{
+      "sortOrder": {
+        "child": {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        },
+        "direction": "SORT_DIRECTION_ASCENDING",
+        "nullOrdering": "SORT_NULLS_FIRST"
+      }
+    }, {
+      "sortOrder": {
+        "child": {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        },
+        "direction": "SORT_DIRECTION_DESCENDING",
+        "nullOrdering": "SORT_NULLS_FIRST"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin
new file mode 100644
index 00000000000..531737b7548
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin
@@ -0,0 +1,6 @@
+�E
+$Z" struct<id:bigint,a:int,b:double>
J
+
+aJ
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json
new file mode 100644
index 00000000000..eede8a4b1ac
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json
@@ -0,0 +1,31 @@
+{
+  "repartitionByExpression": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "partitionExprs": [{
+      "sortOrder": {
+        "child": {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        },
+        "direction": "SORT_DIRECTION_ASCENDING",
+        "nullOrdering": "SORT_NULLS_FIRST"
+      }
+    }, {
+      "sortOrder": {
+        "child": {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        },
+        "direction": "SORT_DIRECTION_DESCENDING",
+        "nullOrdering": "SORT_NULLS_FIRST"
+      }
+    }],
+    "numPartitions": 33
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin
new file mode 100644
index 00000000000..8139f2cb397
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin
@@ -0,0 +1,6 @@
+�G
+$Z" struct<id:bigint,a:int,b:double>
J
+
+bJ
+
+id!
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json
new file mode 100644
index 00000000000..c91a30eb0e9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json
@@ -0,0 +1,18 @@
+{
+  "repartitionByExpression": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "partitionExprs": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "b"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin
new file mode 100644
index 00000000000..c217e9d9d93
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin
@@ -0,0 +1,4 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+id
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json
new file mode 100644
index 00000000000..d70380d1228
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json
@@ -0,0 +1,19 @@
+{
+  "repartitionByExpression": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "partitionExprs": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "numPartitions": 22
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin
new file mode 100644
index 00000000000..47b3ab9daf5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin
@@ -0,0 +1,4 @@
+�7
+$Z" struct<id:bigint,a:int,b:double>
+a
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json
new file mode 100644
index 00000000000..3f4dbedac3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json
@@ -0,0 +1,12 @@
+{
+  "sample": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "upperBound": 0.43,
+    "withReplacement": false,
+    "seed": "9890823"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin
new file mode 100644
index 00000000000..7754a5e213d
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json
new file mode 100644
index 00000000000..5f9f6f21966
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json
@@ -0,0 +1,12 @@
+{
+  "sample": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "upperBound": 0.23,
+    "withReplacement": true,
+    "seed": "898"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin
new file mode 100644
index 00000000000..2e1efe2e527
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin
@@ -0,0 +1,3 @@
+b4
+$Z" struct<id:bigint,a:int,b:double>q=
+ףp�? (�
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json
new file mode 100644
index 00000000000..b38c071d823
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json
@@ -0,0 +1,18 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "expressionString": {
+        "expression": "a + 10 as x"
+      }
+    }, {
+      "expressionString": {
+        "expression": "id % 10 as grp"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin
new file mode 100644
index 00000000000..9f203955a8d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin
@@ -0,0 +1,4 @@
+K
+$Z" struct<id:bigint,a:int,b:double>"
+a + 10 as x"
+id % 10 as grp
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json
new file mode 100644
index 00000000000..cde5f0721a3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json
@@ -0,0 +1,18 @@
+{
+  "project": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin
new file mode 100644
index 00000000000..507d3c15e61
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin
@@ -0,0 +1,4 @@
+5
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json
new file mode 100644
index 00000000000..08d98935f40
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json
@@ -0,0 +1,27 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin
new file mode 100644
index 00000000000..5c076fa9d12
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json
new file mode 100644
index 00000000000..6b13151641d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json
@@ -0,0 +1,27 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "a"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin
new file mode 100644
index 00000000000..572abd9c7a0
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json
new file mode 100644
index 00000000000..08d98935f40
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json
@@ -0,0 +1,27 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin
new file mode 100644
index 00000000000..5c076fa9d12
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json
new file mode 100644
index 00000000000..16220ca0947
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json
@@ -0,0 +1,27 @@
+{
+  "sort": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "order": [{
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }, {
+      "child": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "a"
+        }
+      },
+      "direction": "SORT_DIRECTION_ASCENDING",
+      "nullOrdering": "SORT_NULLS_FIRST"
+    }],
+    "isGlobal": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin
new file mode 100644
index 00000000000..01a61b9c158
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/summary.json b/connector/connect/common/src/test/resources/query-tests/queries/summary.json
new file mode 100644
index 00000000000..48c336fbf7c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/summary.json
@@ -0,0 +1,10 @@
+{
+  "summary": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "statistics": ["mean", "min"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin
new file mode 100644
index 00000000000..69363710ef2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin
@@ -0,0 +1,2 @@
+�1
+$Z" struct<id:bigint,a:int,b:double>meanmin
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to.json b/connector/connect/common/src/test/resources/query-tests/queries/to.json
new file mode 100644
index 00000000000..e09913ff562
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to.json
@@ -0,0 +1,28 @@
+{
+  "toSchema": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "schema": {
+      "struct": {
+        "fields": [{
+          "name": "b",
+          "dataType": {
+            "double": {
+            }
+          },
+          "nullable": true
+        }, {
+          "name": "id",
+          "dataType": {
+            "integer": {
+            }
+          },
+          "nullable": true
+        }]
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin
new file mode 100644
index 00000000000..ce3c1509673
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toDF.json b/connector/connect/common/src/test/resources/query-tests/queries/toDF.json
new file mode 100644
index 00000000000..e753b7d0e3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/toDF.json
@@ -0,0 +1,10 @@
+{
+  "toDf": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "columnNames": ["x1", "x2", "x3"]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin
new file mode 100644
index 00000000000..b88bdf99169
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin
@@ -0,0 +1,2 @@
+�2
+$Z" struct<id:bigint,a:int,b:double>x1x2x3
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/union.json b/connector/connect/common/src/test/resources/query-tests/queries/union.json
new file mode 100644
index 00000000000..170e0f09cf9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/union.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_UNION",
+    "isAll": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin
new file mode 100644
index 00000000000..7c4f869e44f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json
new file mode 100644
index 00000000000..170e0f09cf9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json
@@ -0,0 +1,16 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_UNION",
+    "isAll": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin
new file mode 100644
index 00000000000..7c4f869e44f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double> 
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json
new file mode 100644
index 00000000000..433ba22fa32
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json
@@ -0,0 +1,18 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_UNION",
+    "isAll": true,
+    "byName": true,
+    "allowMissingColumns": false
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin
new file mode 100644
index 00000000000..e08dc6d9fdf
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json
new file mode 100644
index 00000000000..70e770c135f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json
@@ -0,0 +1,18 @@
+{
+  "setOp": {
+    "leftInput": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "rightInput": {
+      "localRelation": {
+        "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+      }
+    },
+    "setOpType": "SET_OP_TYPE_UNION",
+    "isAll": true,
+    "byName": true,
+    "allowMissingColumns": true
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin
new file mode 100644
index 00000000000..29e0896b6c6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin
@@ -0,0 +1,2 @@
+2Z
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> (0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json
new file mode 100644
index 00000000000..662328af9a7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json
@@ -0,0 +1,15 @@
+{
+  "unpivot": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "ids": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }],
+    "valueColumnName": "value"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin
new file mode 100644
index 00000000000..9f2600404f2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin
@@ -0,0 +1,3 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+id*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json
new file mode 100644
index 00000000000..3557958fd29
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json
@@ -0,0 +1,26 @@
+{
+  "unpivot": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "ids": [{
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "id"
+      }
+    }, {
+      "unresolvedAttribute": {
+        "unparsedIdentifier": "a"
+      }
+    }],
+    "values": {
+      "values": [{
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "b"
+        }
+      }]
+    },
+    "valueColumnName": "value"
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin
new file mode 100644
index 00000000000..9fb45c9c0c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin
@@ -0,0 +1,6 @@
+�E
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
+
+b*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_column.json b/connector/connect/common/src/test/resources/query-tests/queries/where_column.json
new file mode 100644
index 00000000000..f8ec863648c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_column.json
@@ -0,0 +1,23 @@
+{
+  "filter": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "condition": {
+      "unresolvedFunction": {
+        "functionName": "\u003d",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "literal": {
+            "long": "1"
+          }
+        }]
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin
new file mode 100644
index 00000000000..eae5db282ca
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin
@@ -0,0 +1,5 @@
+";
+$Z" struct<id:bigint,a:int,b:double>
+=
+id
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json
new file mode 100644
index 00000000000..c244ff0e891
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json
@@ -0,0 +1,14 @@
+{
+  "filter": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "condition": {
+      "expressionString": {
+        "expression": "a + id \u003c 1000"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin
new file mode 100644
index 00000000000..63b44b32b63
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin
@@ -0,0 +1,3 @@
+"9
+$Z" struct<id:bigint,a:int,b:double>"
+
a + id < 1000
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json
new file mode 100644
index 00000000000..ba7d76aec99
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json
@@ -0,0 +1,13 @@
+{
+  "withColumnsRenamed": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "renameColumnsMap": {
+      "b": "bravo",
+      "id": "nid"
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin
new file mode 100644
index 00000000000..38130a873d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin
@@ -0,0 +1,5 @@
+�=
+$Z" struct<id:bigint,a:int,b:double>
+
+bbravo	
+idnid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json
new file mode 100644
index 00000000000..ee507379fa4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json
@@ -0,0 +1,13 @@
+{
+  "withColumnsRenamed": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "renameColumnsMap": {
+      "a": "alpha",
+      "b": "beta"
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin
new file mode 100644
index 00000000000..5a8cfaa51ee
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin
@@ -0,0 +1,5 @@
+�=
+$Z" struct<id:bigint,a:int,b:double>
+
+aalpha	
+bbeta
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json
new file mode 100644
index 00000000000..8d0bfc254eb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json
@@ -0,0 +1,12 @@
+{
+  "withColumnsRenamed": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "renameColumnsMap": {
+      "id": "nid"
+    }
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin
new file mode 100644
index 00000000000..5c60a64d3ad
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin
@@ -0,0 +1,3 @@
+�1
+$Z" struct<id:bigint,a:int,b:double>	
+idnid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json
new file mode 100644
index 00000000000..5bf8b01d5c4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json
@@ -0,0 +1,17 @@
+{
+  "withColumns": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "aliases": [{
+      "expr": {
+        "expressionString": {
+          "expression": "a + 100"
+        }
+      },
+      "name": ["z"]
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin
new file mode 100644
index 00000000000..f6693d6b8b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin
@@ -0,0 +1,4 @@
+�8
+$Z" struct<id:bigint,a:int,b:double>
+"	
+a + 100z
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json
new file mode 100644
index 00000000000..21e75d07aef
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json
@@ -0,0 +1,24 @@
+{
+  "withColumns": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "aliases": [{
+      "expr": {
+        "literal": {
+          "string": "123"
+        }
+      },
+      "name": ["a"]
+    }, {
+      "expr": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "name": ["g"]
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin
new file mode 100644
index 00000000000..f71f6248959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin
@@ -0,0 +1,6 @@
+�A
+$Z" struct<id:bigint,a:int,b:double>
+
+j123a
+
+idg
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json
new file mode 100644
index 00000000000..aca9ce7db7d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json
@@ -0,0 +1,24 @@
+{
+  "withColumns": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "aliases": [{
+      "expr": {
+        "literal": {
+          "string": "redacted"
+        }
+      },
+      "name": ["b"]
+    }, {
+      "expr": {
+        "expressionString": {
+          "expression": "a + 100"
+        }
+      },
+      "name": ["z"]
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin
new file mode 100644
index 00000000000..52a22ed9cec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin
@@ -0,0 +1,7 @@
+�K
+$Z" struct<id:bigint,a:int,b:double>
+
+
+jredactedb
+"	
+a + 100z
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json
new file mode 100644
index 00000000000..0f28e7655a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json
@@ -0,0 +1,18 @@
+{
+  "withColumns": {
+    "input": {
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "aliases": [{
+      "expr": {
+        "unresolvedAttribute": {
+          "unparsedIdentifier": "id"
+        }
+      },
+      "name": ["id"],
+      "metadata": "{\"description\":\"unique identifier\"}"
+    }]
+  }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin
new file mode 100644
index 00000000000..e9aa65874e4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin
@@ -0,0 +1,4 @@
+�Y
+$Z" struct<id:bigint,a:int,b:double>1
+
+idid#{"description":"unique identifier"}
\ No newline at end of file
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index f91040a1009..3e301bb8823 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -25,8 +25,9 @@ import org.apache.spark.connect.proto.Expression.ExpressionString
 import org.apache.spark.connect.proto.Join.JoinType
 import org.apache.spark.connect.proto.SetOperation.SetOpType
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.connect.planner.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue
+import org.apache.spark.sql.connect.planner.SaveModeConverter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -209,7 +210,7 @@ package object dsl {
 
         mode
           .map(SaveMode.valueOf(_))
-          .map(DataTypeProtoConverter.toSaveModeProto(_))
+          .map(SaveModeConverter.toSaveModeProto(_))
           .foreach(writeOp.setMode(_))
 
         if (tableName.nonEmpty) {
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
index 38cce28ad6f..6ddaabb1b88 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connect.planner
 
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala
new file mode 100644
index 00000000000..41a9f97780c
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.planner
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SaveMode
+
+/**
+ * Helper class for conversions between [[SaveMode]] and [[proto.WriteOperation.SaveMode]].
+ */
+object SaveModeConverter {
+  def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode = {
+    mode match {
+      case proto.WriteOperation.SaveMode.SAVE_MODE_APPEND => SaveMode.Append
+      case proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE => SaveMode.Ignore
+      case proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE => SaveMode.Overwrite
+      case proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS => SaveMode.ErrorIfExists
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Cannot convert from WriteOperaton.SaveMode to Spark SaveMode: ${mode.getNumber}")
+    }
+  }
+
+  def toSaveModeProto(mode: SaveMode): proto.WriteOperation.SaveMode = {
+    mode match {
+      case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
+      case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
+      case SaveMode.Overwrite => proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
+      case SaveMode.ErrorIfExists => proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Cannot convert from SaveMode to WriteOperation.SaveMode: ${mode.name()}")
+    }
+  }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 2924ee29e04..88bfbe8a8e0 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, L
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, Intersect, LocalRelation, LogicalPlan, Sample, Sort, SubqueryAlias, Union, Unpivot, UnresolvedHint}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
-import org.apache.spark.sql.connect.common.UdfPacket
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, UdfPacket}
 import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.{toCatalystExpression, toCatalystValue}
 import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -48,11 +48,6 @@ import org.apache.spark.sql.internal.CatalogImpl
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-final case class InvalidPlanInput(
-    private val message: String = "",
-    private val cause: Throwable = None.orNull)
-    extends Exception(message, cause)
-
 final case class InvalidCommandInput(
     private val message: String = "",
     private val cause: Throwable = null)
@@ -1509,7 +1504,7 @@ class SparkConnectPlanner(val session: SparkSession) {
 
     val w = dataset.write
     if (writeOperation.getMode != proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
-      w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+      w.mode(SaveModeConverter.toSaveMode(writeOperation.getMode))
     }
 
     if (writeOperation.getOptionsCount > 0) {
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 05aa2428140..0a5b4197b78 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -43,8 +43,9 @@ import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
 import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
-import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
 import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExplainMode, ExtendedMode, FormattedMode, SimpleMode}
 
 /**
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 7470d539787..3e4a0f94ea2 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue
 import org.apache.spark.sql.execution.arrow.ArrowConverters
 import org.apache.spark.sql.test.SharedSparkSession
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index e94b6d137cd..f94fc215ee3 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.connect.dsl.MockRemoteSession
 import org.apache.spark.sql.connect.dsl.commands._
 import org.apache.spark.sql.connect.dsl.expressions._
@@ -642,7 +643,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
 
   test("SaveMode conversion tests") {
     assertThrows[IllegalArgumentException](
-      DataTypeProtoConverter.toSaveMode(proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED))
+      SaveModeConverter.toSaveMode(proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED))
 
     val combinations = Seq(
       (SaveMode.Append, proto.WriteOperation.SaveMode.SAVE_MODE_APPEND),
@@ -650,8 +651,8 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
       (SaveMode.Overwrite, proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE),
       (SaveMode.ErrorIfExists, proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS))
     combinations.foreach { a =>
-      assert(DataTypeProtoConverter.toSaveModeProto(a._1) == a._2)
-      assert(DataTypeProtoConverter.toSaveMode(a._2) == a._1)
+      assert(SaveModeConverter.toSaveModeProto(a._1) == a._2)
+      assert(SaveModeConverter.toSaveMode(a._2) == a._1)
     }
   }
 
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
index dfb7f5d0f90..7abe4a4d085 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
@@ -25,8 +25,9 @@ import org.apache.spark.connect.proto.Relation
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.common.InvalidPlanInput
 import org.apache.spark.sql.connect.config.Connect
-import org.apache.spark.sql.connect.planner.{InvalidPlanInput, SparkConnectPlanner, SparkConnectPlanTest}
+import org.apache.spark.sql.connect.planner.{SparkConnectPlanner, SparkConnectPlanTest}
 import org.apache.spark.sql.test.SharedSparkSession
 
 class DummyPlugin extends RelationPlugin {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org