You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/02/15 14:54:34 UTC
[spark] branch branch-3.4 updated: [SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new a46278f98b3 [SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client
a46278f98b3 is described below
commit a46278f98b3500dd0ec22d4580f954c8c8a1ddb3
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Wed Feb 15 10:54:06 2023 -0400
[SPARK-42440][CONNECT] Initial set of Dataframe APIs for Scala Client
### What changes were proposed in this pull request?
Add a lot of the existing Dataframe APIs to the Spark Connect Scala Client.
This PR does not contain:
- Typed APIs
- Aggregation
- Streaming (not supported by connect just yet)
- NA/Stats functions
- TempView registration.
### Why are the changes needed?
We want the Scala Client Dataset to reach parity with the existing Dataset.
### How was this patch tested?
Added a lot of golden tests.
Added a number of test cases to the E2E suite for the functionality that requires server interaction.
Closes #40019 from hvanhovell/SPARK-42440.
Authored-by: Herman van Hovell <he...@databricks.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
(cherry picked from commit 9843c7c9a259250f94678e0502fa21f58c80d27c)
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
.../main/scala/org/apache/spark/sql/Column.scala | 117 ++
.../main/scala/org/apache/spark/sql/Dataset.scala | 2201 +++++++++++++++++++-
.../scala/org/apache/spark/sql/SparkSession.scala | 16 +-
.../sql/connect/client/SparkConnectClient.scala | 3 +-
.../scala/org/apache/spark/sql/functions.scala | 14 +
.../org/apache/spark/sql/ClientE2ETestSuite.scala | 184 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 4 +
.../apache/spark/sql/PlanGenerationTestSuite.scala | 325 ++-
.../org/apache/spark/sql/SparkSessionSuite.scala | 2 +-
.../sql/connect/client/CompatibilitySuite.scala | 4 +-
.../connect/common}/DataTypeProtoConverter.scala | 29 +-
.../sql/connect/common/InvalidPlanInput.scala | 25 +
.../explain-results/alias_string.explain | 1 +
.../explain-results/alias_symbol.explain | 1 +
.../query-tests/explain-results/apply.explain | 2 +
.../query-tests/explain-results/as_string.explain | 1 +
.../query-tests/explain-results/as_symbol.explain | 1 +
.../query-tests/explain-results/coalesce.explain | 2 +
.../query-tests/explain-results/col.explain | 2 +
.../query-tests/explain-results/colRegex.explain | 2 +
.../query-tests/explain-results/crossJoin.explain | 3 +
.../query-tests/explain-results/describe.explain | 6 +
.../query-tests/explain-results/distinct.explain | 2 +
.../explain-results/dropDuplicates.explain | 2 +
.../dropDuplicates_names_array.explain | 2 +
.../dropDuplicates_names_seq.explain | 2 +
.../explain-results/dropDuplicates_varargs.explain | 2 +
.../explain-results/drop_multiple_column.explain | 2 +
.../explain-results/drop_multiple_strings.explain | 2 +
.../explain-results/drop_single_column.explain | 2 +
.../explain-results/drop_single_string.explain | 2 +
.../query-tests/explain-results/except.explain | 3 +
.../query-tests/explain-results/exceptAll.explain | 3 +
.../explain-results/filter_expr.explain | 2 +
.../explain-results/function_udf.explain | 2 +
.../query-tests/explain-results/hint.explain | 2 +
.../query-tests/explain-results/intersect.explain | 3 +
.../explain-results/intersectAll.explain | 3 +
.../explain-results/join_condition.explain | 3 +
.../explain-results/join_inner_condition.explain | 3 +
.../join_inner_no_condition.explain | 3 +
.../join_inner_using_multiple_col_array.explain | 3 +
.../join_inner_using_multiple_col_seq.explain | 3 +
.../join_inner_using_single_col.explain | 3 +
.../join_using_multiple_col_array.explain | 3 +
.../join_using_multiple_col_seq.explain | 3 +
.../explain-results/join_using_single_col.explain | 3 +
.../explain-results/melt_no_values.explain | 2 +
.../explain-results/melt_values.explain | 2 +
.../query-tests/explain-results/offset.explain | 2 +
.../explain-results/orderBy_columns.explain | 2 +
.../explain-results/orderBy_strings.explain | 2 +
.../explain-results/repartition.explain | 2 +
.../repartitionByRange_expressions.explain | 2 +
...itionByRange_num_partitions_expressions.explain | 2 +
.../repartition_expressions.explain | 2 +
.../repartition_num_partitions_expressions.explain | 2 +
.../explain-results/sample_fraction_seed.explain | 2 +
.../sample_withReplacement_fraction_seed.explain | 2 +
.../query-tests/explain-results/selectExpr.explain | 2 +
.../explain-results/select_strings.explain | 2 +
.../sortWithinPartitions_columns.explain | 2 +
.../sortWithinPartitions_strings.explain | 2 +
.../explain-results/sort_columns.explain | 2 +
.../explain-results/sort_strings.explain | 2 +
.../query-tests/explain-results/summary.explain | 5 +
.../query-tests/explain-results/to.explain | 2 +
.../query-tests/explain-results/toDF.explain | 2 +
.../query-tests/explain-results/union.explain | 3 +
.../query-tests/explain-results/unionAll.explain | 3 +
.../explain-results/unionByName.explain | 3 +
.../unionByName_allowMissingColumns.explain | 3 +
.../explain-results/unpivot_no_values.explain | 2 +
.../explain-results/unpivot_values.explain | 2 +
.../explain-results/where_column.explain | 2 +
.../query-tests/explain-results/where_expr.explain | 2 +
.../withColumnRenamed_java_map.explain | 2 +
.../withColumnRenamed_scala_map.explain | 2 +
.../withColumnRenamed_single.explain | 2 +
.../explain-results/withColumn_single.explain | 2 +
.../explain-results/withColumns_java_map.explain | 2 +
.../explain-results/withColumns_scala_map.explain | 2 +
.../explain-results/withMetadata.explain | 2 +
.../query-tests/queries/alias_string.json | 10 +
.../query-tests/queries/alias_string.proto.bin | 2 +
.../query-tests/queries/alias_symbol.json | 10 +
.../query-tests/queries/alias_symbol.proto.bin | 2 +
.../test/resources/query-tests/queries/apply.json | 14 +
.../resources/query-tests/queries/apply.proto.bin | 3 +
.../resources/query-tests/queries/as_string.json | 10 +
.../query-tests/queries/as_string.proto.bin | 2 +
.../resources/query-tests/queries/as_symbol.json | 10 +
.../query-tests/queries/as_symbol.proto.bin | 2 +
.../resources/query-tests/queries/coalesce.json | 11 +
.../query-tests/queries/coalesce.proto.bin | Bin 0 -> 45 bytes
.../test/resources/query-tests/queries/col.json | 18 +
.../resources/query-tests/queries/col.proto.bin | 4 +
.../resources/query-tests/queries/colRegex.json | 14 +
.../query-tests/queries/colRegex.proto.bin | 3 +
.../resources/query-tests/queries/crossJoin.json | 15 +
.../query-tests/queries/crossJoin.proto.bin | 2 +
.../resources/query-tests/queries/describe.json | 10 +
.../query-tests/queries/describe.proto.bin | 2 +
.../resources/query-tests/queries/distinct.json | 10 +
.../query-tests/queries/distinct.proto.bin | 2 +
.../query-tests/queries/dropDuplicates.json | 10 +
.../query-tests/queries/dropDuplicates.proto.bin | 2 +
.../queries/dropDuplicates_names_array.json | 10 +
.../queries/dropDuplicates_names_array.proto.bin | 2 +
.../queries/dropDuplicates_names_seq.json | 10 +
.../queries/dropDuplicates_names_seq.proto.bin | 2 +
.../queries/dropDuplicates_varargs.json | 10 +
.../queries/dropDuplicates_varargs.proto.bin | 2 +
.../query-tests/queries/drop_multiple_column.json | 18 +
.../queries/drop_multiple_column.proto.bin | 4 +
.../query-tests/queries/drop_multiple_strings.json | 22 +
.../queries/drop_multiple_strings.proto.bin | 5 +
.../query-tests/queries/drop_single_column.json | 14 +
.../queries/drop_single_column.proto.bin | 3 +
.../query-tests/queries/drop_single_string.json | 14 +
.../queries/drop_single_string.proto.bin | 3 +
.../test/resources/query-tests/queries/except.json | 16 +
.../resources/query-tests/queries/except.proto.bin | Bin 0 -> 82 bytes
.../resources/query-tests/queries/exceptAll.json | 16 +
.../query-tests/queries/exceptAll.proto.bin | 2 +
.../resources/query-tests/queries/filter_expr.json | 14 +
.../query-tests/queries/filter_expr.proto.bin | 3 +
.../query-tests/queries/function_udf.json | 96 +
.../query-tests/queries/function_udf.proto.bin | Bin 0 -> 11257 bytes
.../test/resources/query-tests/queries/hint.json | 15 +
.../resources/query-tests/queries/hint.proto.bin | 3 +
.../resources/query-tests/queries/intersect.json | 16 +
.../query-tests/queries/intersect.proto.bin | Bin 0 -> 82 bytes
.../query-tests/queries/intersectAll.json | 16 +
.../query-tests/queries/intersectAll.proto.bin | 2 +
.../query-tests/queries/join_condition.json | 29 +
.../query-tests/queries/join_condition.proto.bin | 5 +
.../query-tests/queries/join_inner_condition.json | 29 +
.../queries/join_inner_condition.proto.bin | 5 +
.../queries/join_inner_no_condition.json | 15 +
.../queries/join_inner_no_condition.proto.bin | 2 +
.../join_inner_using_multiple_col_array.json | 16 +
.../join_inner_using_multiple_col_array.proto.bin | 2 +
.../queries/join_inner_using_multiple_col_seq.json | 16 +
.../join_inner_using_multiple_col_seq.proto.bin | 2 +
.../queries/join_inner_using_single_col.json | 16 +
.../queries/join_inner_using_single_col.proto.bin | 2 +
.../queries/join_using_multiple_col_array.json | 16 +
.../join_using_multiple_col_array.proto.bin | 2 +
.../queries/join_using_multiple_col_seq.json | 16 +
.../queries/join_using_multiple_col_seq.proto.bin | 2 +
.../query-tests/queries/join_using_single_col.json | 16 +
.../queries/join_using_single_col.proto.bin | 2 +
.../query-tests/queries/melt_no_values.json | 19 +
.../query-tests/queries/melt_no_values.proto.bin | 4 +
.../resources/query-tests/queries/melt_values.json | 22 +
.../query-tests/queries/melt_values.proto.bin | 5 +
.../test/resources/query-tests/queries/offset.json | 10 +
.../resources/query-tests/queries/offset.proto.bin | 2 +
.../query-tests/queries/orderBy_columns.json | 35 +
.../query-tests/queries/orderBy_columns.proto.bin | Bin 0 -> 82 bytes
.../query-tests/queries/orderBy_strings.json | 35 +
.../query-tests/queries/orderBy_strings.proto.bin | Bin 0 -> 82 bytes
.../resources/query-tests/queries/repartition.json | 11 +
.../query-tests/queries/repartition.proto.bin | 2 +
.../queries/repartitionByRange_expressions.json | 30 +
.../repartitionByRange_expressions.proto.bin | 6 +
...artitionByRange_num_partitions_expressions.json | 31 +
...ionByRange_num_partitions_expressions.proto.bin | 6 +
.../queries/repartition_expressions.json | 18 +
.../queries/repartition_expressions.proto.bin | 4 +
.../repartition_num_partitions_expressions.json | 19 +
...epartition_num_partitions_expressions.proto.bin | 4 +
.../query-tests/queries/sample_fraction_seed.json | 12 +
.../queries/sample_fraction_seed.proto.bin | Bin 0 -> 56 bytes
.../sample_withReplacement_fraction_seed.json | 12 +
.../sample_withReplacement_fraction_seed.proto.bin | 3 +
.../resources/query-tests/queries/selectExpr.json | 18 +
.../query-tests/queries/selectExpr.proto.bin | 4 +
.../query-tests/queries/select_strings.json | 18 +
.../query-tests/queries/select_strings.proto.bin | 4 +
.../queries/sortWithinPartitions_columns.json | 27 +
.../queries/sortWithinPartitions_columns.proto.bin | Bin 0 -> 69 bytes
.../queries/sortWithinPartitions_strings.json | 27 +
.../queries/sortWithinPartitions_strings.proto.bin | Bin 0 -> 69 bytes
.../query-tests/queries/sort_columns.json | 27 +
.../query-tests/queries/sort_columns.proto.bin | Bin 0 -> 69 bytes
.../query-tests/queries/sort_strings.json | 27 +
.../query-tests/queries/sort_strings.proto.bin | Bin 0 -> 68 bytes
.../resources/query-tests/queries/summary.json | 10 +
.../query-tests/queries/summary.proto.bin | 2 +
.../src/test/resources/query-tests/queries/to.json | 28 +
.../resources/query-tests/queries/to.proto.bin | Bin 0 -> 69 bytes
.../test/resources/query-tests/queries/toDF.json | 10 +
.../resources/query-tests/queries/toDF.proto.bin | 2 +
.../test/resources/query-tests/queries/union.json | 16 +
.../resources/query-tests/queries/union.proto.bin | 2 +
.../resources/query-tests/queries/unionAll.json | 16 +
.../query-tests/queries/unionAll.proto.bin | 2 +
.../resources/query-tests/queries/unionByName.json | 18 +
.../query-tests/queries/unionByName.proto.bin | Bin 0 -> 92 bytes
.../queries/unionByName_allowMissingColumns.json | 18 +
.../unionByName_allowMissingColumns.proto.bin | 2 +
.../query-tests/queries/unpivot_no_values.json | 15 +
.../queries/unpivot_no_values.proto.bin | 3 +
.../query-tests/queries/unpivot_values.json | 26 +
.../query-tests/queries/unpivot_values.proto.bin | 6 +
.../query-tests/queries/where_column.json | 23 +
.../query-tests/queries/where_column.proto.bin | 5 +
.../resources/query-tests/queries/where_expr.json | 14 +
.../query-tests/queries/where_expr.proto.bin | 3 +
.../queries/withColumnRenamed_java_map.json | 13 +
.../queries/withColumnRenamed_java_map.proto.bin | 5 +
.../queries/withColumnRenamed_scala_map.json | 13 +
.../queries/withColumnRenamed_scala_map.proto.bin | 5 +
.../queries/withColumnRenamed_single.json | 12 +
.../queries/withColumnRenamed_single.proto.bin | 3 +
.../query-tests/queries/withColumn_single.json | 17 +
.../queries/withColumn_single.proto.bin | 4 +
.../query-tests/queries/withColumns_java_map.json | 24 +
.../queries/withColumns_java_map.proto.bin | 6 +
.../query-tests/queries/withColumns_scala_map.json | 24 +
.../queries/withColumns_scala_map.proto.bin | 7 +
.../query-tests/queries/withMetadata.json | 18 +
.../query-tests/queries/withMetadata.proto.bin | 4 +
.../org/apache/spark/sql/connect/dsl/package.scala | 5 +-
.../planner/LiteralValueProtoConverter.scala | 1 +
.../sql/connect/planner/SaveModeConverter.scala | 49 +
.../sql/connect/planner/SparkConnectPlanner.scala | 9 +-
.../sql/connect/service/SparkConnectService.scala | 3 +-
.../connect/planner/SparkConnectPlannerSuite.scala | 1 +
.../connect/planner/SparkConnectProtoSuite.scala | 7 +-
.../plugin/SparkConnectPluginRegistrySuite.scala | 3 +-
233 files changed, 4577 insertions(+), 85 deletions(-)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
index 1154b1e516c..1634c2d5b05 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Column.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import scala.collection.JavaConverters._
import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.Expression.SortOrder.NullOrdering
+import org.apache.spark.connect.proto.Expression.SortOrder.SortDirection
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Column.fn
import org.apache.spark.sql.connect.client.unsupported
@@ -95,6 +97,121 @@ class Column private[sql] (private[sql] val expr: proto.Expression) extends Logg
def name(alias: String): Column = Column { builder =>
builder.getAliasBuilder.addName(alias).setExpr(expr)
}
+
+ /**
+ * Returns a sort expression based on the descending order of the column.
+ * {{{
+ * // Scala
+ * df.sort(df("age").desc)
+ *
+ * // Java
+ * df.sort(df.col("age").desc());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 1.3.0
+ */
+ def desc: Column = desc_nulls_last
+
+ /**
+ * Returns a sort expression based on the descending order of the column, and null values appear
+ * before non-null values.
+ * {{{
+ * // Scala: sort a DataFrame by age column in descending order and null values appearing first.
+ * df.sort(df("age").desc_nulls_first)
+ *
+ * // Java
+ * df.sort(df.col("age").desc_nulls_first());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 2.1.0
+ */
+ def desc_nulls_first: Column =
+ buildSortOrder(SortDirection.SORT_DIRECTION_DESCENDING, NullOrdering.SORT_NULLS_FIRST)
+
+ /**
+ * Returns a sort expression based on the descending order of the column, and null values appear
+ * after non-null values.
+ * {{{
+ * // Scala: sort a DataFrame by age column in descending order and null values appearing last.
+ * df.sort(df("age").desc_nulls_last)
+ *
+ * // Java
+ * df.sort(df.col("age").desc_nulls_last());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 2.1.0
+ */
+ def desc_nulls_last: Column =
+ buildSortOrder(SortDirection.SORT_DIRECTION_DESCENDING, NullOrdering.SORT_NULLS_LAST)
+
+ /**
+ * Returns a sort expression based on ascending order of the column.
+ * {{{
+ * // Scala: sort a DataFrame by age column in ascending order.
+ * df.sort(df("age").asc)
+ *
+ * // Java
+ * df.sort(df.col("age").asc());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 1.3.0
+ */
+ def asc: Column = asc_nulls_first
+
+ /**
+ * Returns a sort expression based on ascending order of the column, and null values return
+ * before non-null values.
+ * {{{
+ * // Scala: sort a DataFrame by age column in ascending order and null values appearing first.
+ * df.sort(df("age").asc_nulls_first)
+ *
+ * // Java
+ * df.sort(df.col("age").asc_nulls_first());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 2.1.0
+ */
+ def asc_nulls_first: Column =
+ buildSortOrder(SortDirection.SORT_DIRECTION_ASCENDING, NullOrdering.SORT_NULLS_FIRST)
+
+ /**
+ * Returns a sort expression based on ascending order of the column, and null values appear
+ * after non-null values.
+ * {{{
+ * // Scala: sort a DataFrame by age column in ascending order and null values appearing last.
+ * df.sort(df("age").asc_nulls_last)
+ *
+ * // Java
+ * df.sort(df.col("age").asc_nulls_last());
+ * }}}
+ *
+ * @group expr_ops
+ * @since 2.1.0
+ */
+ def asc_nulls_last: Column =
+ buildSortOrder(SortDirection.SORT_DIRECTION_ASCENDING, NullOrdering.SORT_NULLS_LAST)
+
+ private def buildSortOrder(sortDirection: SortDirection, nullOrdering: NullOrdering): Column =
+ Column { builder =>
+ builder.getSortOrderBuilder
+ .setChild(expr)
+ .setDirection(sortDirection)
+ .setNullOrdering(nullOrdering)
+ }
+
+ private[sql] def sortOrder: proto.Expression.SortOrder = {
+ val base = if (expr.hasSortOrder) {
+ expr
+ } else {
+ asc.expr
+ }
+ base.getSortOrder
+ }
}
private[sql] object Column {
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 51b734d1daa..fd0c9320ab4 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -16,60 +16,2215 @@
*/
package org.apache.spark.sql
+import java.util.{Collections, Locale}
+
import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
+import org.apache.spark.sql.types.{Metadata, StructType}
+import org.apache.spark.util.Utils
+/**
+ * A Dataset is a strongly typed collection of domain-specific objects that can be transformed in
+ * parallel using functional or relational operations. Each Dataset also has an untyped view
+ * called a `DataFrame`, which is a Dataset of [[Row]].
+ *
+ * Operations available on Datasets are divided into transformations and actions. Transformations
+ * are the ones that produce new Datasets, and actions are the ones that trigger computation and
+ * return results. Example transformations include map, filter, select, and aggregate (`groupBy`).
+ * Example actions count, show, or writing data out to file systems.
+ *
+ * Datasets are "lazy", i.e. computations are only triggered when an action is invoked.
+ * Internally, a Dataset represents a logical plan that describes the computation required to
+ * produce the data. When an action is invoked, Spark's query optimizer optimizes the logical plan
+ * and generates a physical plan for efficient execution in a parallel and distributed manner. To
+ * explore the logical plan as well as optimized physical plan, use the `explain` function.
+ *
+ * To efficiently support domain-specific objects, an [[Encoder]] is required. The encoder maps
+ * the domain specific type `T` to Spark's internal type system. For example, given a class
+ * `Person` with two fields, `name` (string) and `age` (int), an encoder is used to tell Spark to
+ * generate code at runtime to serialize the `Person` object into a binary structure. This binary
+ * structure often has much lower memory footprint as well as are optimized for efficiency in data
+ * processing (e.g. in a columnar format). To understand the internal binary representation for
+ * data, use the `schema` function.
+ *
+ * There are typically two ways to create a Dataset. The most common way is by pointing Spark to
+ * some files on storage systems, using the `read` function available on a `SparkSession`.
+ * {{{
+ * val people = spark.read.parquet("...").as[Person] // Scala
+ * Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java
+ * }}}
+ *
+ * Datasets can also be created through transformations available on existing Datasets. For
+ * example, the following creates a new Dataset by applying a filter on the existing one:
+ * {{{
+ * val names = people.map(_.name) // in Scala; names is a Dataset[String]
+ * Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));
+ * }}}
+ *
+ * Dataset operations can also be untyped, through various domain-specific-language (DSL)
+ * functions defined in: Dataset (this class), [[Column]], and [[functions]]. These operations are
+ * very similar to the operations available in the data frame abstraction in R or Python.
+ *
+ * To select a column from the Dataset, use `apply` method in Scala and `col` in Java.
+ * {{{
+ * val ageCol = people("age") // in Scala
+ * Column ageCol = people.col("age"); // in Java
+ * }}}
+ *
+ * Note that the [[Column]] type can also be manipulated through its various functions.
+ * {{{
+ * // The following creates a new column that increases everybody's age by 10.
+ * people("age") + 10 // in Scala
+ * people.col("age").plus(10); // in Java
+ * }}}
+ *
+ * A more concrete example in Scala:
+ * {{{
+ * // To create Dataset[Row] using SparkSession
+ * val people = spark.read.parquet("...")
+ * val department = spark.read.parquet("...")
+ *
+ * people.filter("age > 30")
+ * .join(department, people("deptId") === department("id"))
+ * .groupBy(department("name"), people("gender"))
+ * .agg(avg(people("salary")), max(people("age")))
+ * }}}
+ *
+ * and in Java:
+ * {{{
+ * // To create Dataset<Row> using SparkSession
+ * Dataset<Row> people = spark.read().parquet("...");
+ * Dataset<Row> department = spark.read().parquet("...");
+ *
+ * people.filter(people.col("age").gt(30))
+ * .join(department, people.col("deptId").equalTo(department.col("id")))
+ * .groupBy(department.col("name"), people.col("gender"))
+ * .agg(avg(people.col("salary")), max(people.col("age")));
+ * }}}
+ *
+ * @groupname basic Basic Dataset functions
+ * @groupname action Actions
+ * @groupname untypedrel Untyped transformations
+ * @groupname typedrel Typed transformations
+ *
+ * @since 3.4.0
+ */
class Dataset[T] private[sql] (val session: SparkSession, private[sql] val plan: proto.Plan)
extends Serializable {
+ override def toString: String = {
+ try {
+ val builder = new mutable.StringBuilder
+ val fields = schema.take(2).map { f =>
+ s"${f.name}: ${f.dataType.simpleString(2)}"
+ }
+ builder.append("[")
+ builder.append(fields.mkString(", "))
+ if (schema.length > 2) {
+ if (schema.length - fields.size == 1) {
+ builder.append(" ... 1 more field")
+ } else {
+ builder.append(" ... " + (schema.length - 2) + " more fields")
+ }
+ }
+ builder.append("]").toString()
+ } catch {
+ case NonFatal(e) =>
+ s"Invalid Dataframe; ${e.getMessage}"
+ }
+ }
+
/**
- * Selects a set of column based expressions.
+ * Converts this strongly typed collection of data to generic Dataframe. In contrast to the
+ * strongly typed objects that Dataset operations work on, a Dataframe returns generic [[Row]]
+ * objects that allow fields to be accessed by ordinal or name.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def toDF(): DataFrame = {
+ // Note this will change as soon as we add the typed APIs.
+ this.asInstanceOf[Dataset[Row]]
+ }
+
+ /**
+ * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed.
+ * This can be quite convenient in conversion from an RDD of tuples into a `DataFrame` with
+ * meaningful names. For example:
* {{{
- * ds.select($"colA", $"colB" + 1)
+ * val rdd: RDD[(Int, String)] = ...
+ * rdd.toDF() // this implicit conversion creates a DataFrame with column name `_1` and `_2`
+ * rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
*
- * @group untypedrel
+ * @group basic
* @since 3.4.0
*/
@scala.annotation.varargs
- def select(cols: Column*): DataFrame = session.newDataset { builder =>
- builder.getProjectBuilder
+ def toDF(colNames: String*): DataFrame = session.newDataset { builder =>
+ builder.getToDfBuilder
.setInput(plan.getRoot)
- .addAllExpressions(cols.map(_.expr).asJava)
+ .addAllColumnNames(colNames.asJava)
}
/**
- * Filters rows using the given condition.
+ * Returns a new DataFrame where each row is reconciled to match the specified schema. Spark
+ * will: <ul> <li>Reorder columns and/or inner fields by name to match the specified
+ * schema.</li> <li>Project away columns and/or inner fields that are not needed by the
+ * specified schema. Missing columns and/or inner fields (present in the specified schema but
+ * not input DataFrame) lead to failures.</li> <li>Cast the columns and/or inner fields to match
+ * the data types in the specified schema, if the types are compatible, e.g., numeric to numeric
+ * (error if overflows), but not string to int.</li> <li>Carry over the metadata from the
+ * specified schema, while the columns and/or inner fields still keep their own metadata if not
+ * overwritten by the specified schema.</li> <li>Fail if the nullability is not compatible. For
+ * example, the column and/or inner field is nullable but the specified schema requires them to
+ * be not nullable.</li> </ul>
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def to(schema: StructType): DataFrame = session.newDataset { builder =>
+ builder.getToSchemaBuilder
+ .setInput(plan.getRoot)
+ .setSchema(DataTypeProtoConverter.toConnectProtoType(schema))
+ }
+
+ /**
+ * Returns the schema of this Dataset.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def schema: StructType = {
+ DataTypeProtoConverter.toCatalystType(analyze.getSchema).asInstanceOf[StructType]
+ }
+
+ /**
+ * Prints the schema to the console in a nice tree format.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def printSchema(): Unit = printSchema(Int.MaxValue)
+
+ // scalastyle:off println
+ /**
+ * Prints the schema up to the given level to the console in a nice tree format.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def printSchema(level: Int): Unit = println(schema.treeString(level))
+ // scalastyle:on println
+
+ /**
+ * Prints the plans (logical and physical) with a format specified by a given explain mode.
+ *
+ * @param mode
+ * specifies the expected output format of plans. <ul> <li>`simple` Print only a physical
+ * plan.</li> <li>`extended`: Print both logical and physical plans.</li> <li>`codegen`: Print
+ * a physical plan and generated codes if they are available.</li> <li>`cost`: Print a logical
+ * plan and statistics if they are available.</li> <li>`formatted`: Split explain output into
+ * two sections: a physical plan outline and node details.</li> </ul>
+ * @group basic
+ * @since 3.4.0
+ */
+ def explain(mode: String): Unit = {
+ val protoMode = mode.trim.toLowerCase(Locale.ROOT) match {
+ case "simple" => proto.Explain.ExplainMode.SIMPLE
+ case "extended" => proto.Explain.ExplainMode.EXTENDED
+ case "codegen" => proto.Explain.ExplainMode.CODEGEN
+ case "cost" => proto.Explain.ExplainMode.COST
+ case "formatted" => proto.Explain.ExplainMode.FORMATTED
+ case _ => throw new IllegalArgumentException("Unsupported explain mode: " + mode)
+ }
+ explain(protoMode)
+ }
+
+ /**
+ * Prints the plans (logical and physical) to the console for debugging purposes.
+ *
+ * @param extended
+ * default `false`. If `false`, prints only the physical plan.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def explain(extended: Boolean): Unit = {
+ val mode = if (extended) {
+ proto.Explain.ExplainMode.EXTENDED
+ } else {
+ proto.Explain.ExplainMode.SIMPLE
+ }
+ explain(mode)
+ }
+
+ /**
+ * Prints the physical plan to the console for debugging purposes.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def explain(): Unit = explain(proto.Explain.ExplainMode.SIMPLE)
+
+ private def explain(mode: proto.Explain.ExplainMode): Unit = {
+ // scalastyle:off println
+ println(session.analyze(plan, mode).getExplainString)
+ // scalastyle:on println
+ }
+
+ /**
+ * Returns all column names and their data types as an array.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def dtypes: Array[(String, String)] = schema.fields.map { field =>
+ (field.name, field.dataType.toString)
+ }
+
+ /**
+ * Returns all column names as an array.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def columns: Array[String] = schema.fields.map(_.name)
+
+ /**
+ * Returns true if the `collect` and `take` methods can be run locally (without any Spark
+ * executors).
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def isLocal: Boolean = analyze.getIsLocal
+
+ /**
+ * Returns true if the `Dataset` is empty.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def isEmpty: Boolean = select().limit(1).withResult { result =>
+ result.length == 0
+ }
+
+ /**
+ * Returns true if this Dataset contains one or more sources that continuously return data as it
+ * arrives. A Dataset that reads data from a streaming source must be executed as a
+ * `StreamingQuery` using the `start()` method in `DataStreamWriter`.
+ *
+ * @group streaming
+ * @since 3.4.0
+ */
+ def isStreaming: Boolean = analyze.getIsStreaming
+
+ /**
+ * Displays the Dataset in a tabular form. Strings more than 20 characters will be truncated,
+ * and all cells will be aligned right. For example:
* {{{
- * // The following are equivalent:
- * peopleDs.filter($"age" > 15)
- * peopleDs.where($"age" > 15)
+ * year month AVG('Adj Close) MAX('Adj Close)
+ * 1980 12 0.503218 0.595103
+ * 1981 01 0.523289 0.570307
+ * 1982 02 0.436504 0.475256
+ * 1983 03 0.410516 0.442194
+ * 1984 04 0.450090 0.483521
* }}}
*
- * @group typedrel
+ * @param numRows
+ * Number of rows to show
+ *
+ * @group action
* @since 3.4.0
*/
- def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
- builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
+ def show(numRows: Int): Unit = show(numRows, truncate = true)
+
+ /**
+ * Displays the top 20 rows of Dataset in a tabular form. Strings more than 20 characters will
+ * be truncated, and all cells will be aligned right.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def show(): Unit = show(20)
+
+ /**
+ * Displays the top 20 rows of Dataset in a tabular form.
+ *
+ * @param truncate
+ * Whether truncate long strings. If true, strings more than 20 characters will be truncated
+ * and all cells will be aligned right
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def show(truncate: Boolean): Unit = show(20, truncate)
+
+ /**
+ * Displays the Dataset in a tabular form. For example:
+ * {{{
+ * year month AVG('Adj Close) MAX('Adj Close)
+ * 1980 12 0.503218 0.595103
+ * 1981 01 0.523289 0.570307
+ * 1982 02 0.436504 0.475256
+ * 1983 03 0.410516 0.442194
+ * 1984 04 0.450090 0.483521
+ * }}}
+ * @param numRows
+ * Number of rows to show
+ * @param truncate
+ * Whether truncate long strings. If true, strings more than 20 characters will be truncated
+ * and all cells will be aligned right
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ // scalastyle:off println
+ def show(numRows: Int, truncate: Boolean): Unit = {
+ val truncateValue = if (truncate) 20 else 0
+ show(numRows, truncateValue, vertical = false)
}
/**
- * Returns a new Dataset by taking the first `n` rows. The difference between this function and
- * `head` is that `head` is an action and returns an array (by triggering query execution) while
- * `limit` returns a new Dataset.
+ * Displays the Dataset in a tabular form. For example:
+ * {{{
+ * year month AVG('Adj Close) MAX('Adj Close)
+ * 1980 12 0.503218 0.595103
+ * 1981 01 0.523289 0.570307
+ * 1982 02 0.436504 0.475256
+ * 1983 03 0.410516 0.442194
+ * 1984 04 0.450090 0.483521
+ * }}}
*
- * @group typedrel
+ * @param numRows
+ * Number of rows to show
+ * @param truncate
+ * If set to more than 0, truncates strings to `truncate` characters and all cells will be
+ * aligned right.
+ * @group action
* @since 3.4.0
*/
- def limit(n: Int): Dataset[T] = session.newDataset { builder =>
- builder.getLimitBuilder
- .setInput(plan.getRoot)
- .setLimit(n)
+ def show(numRows: Int, truncate: Int): Unit = show(numRows, truncate, vertical = false)
+
+ /**
+ * Displays the Dataset in a tabular form. For example:
+ * {{{
+ * year month AVG('Adj Close) MAX('Adj Close)
+ * 1980 12 0.503218 0.595103
+ * 1981 01 0.523289 0.570307
+ * 1982 02 0.436504 0.475256
+ * 1983 03 0.410516 0.442194
+ * 1984 04 0.450090 0.483521
+ * }}}
+ *
+ * If `vertical` enabled, this command prints output rows vertically (one line per column
+ * value)?
+ *
+ * {{{
+ * -RECORD 0-------------------
+ * year | 1980
+ * month | 12
+ * AVG('Adj Close) | 0.503218
+ * AVG('Adj Close) | 0.595103
+ * -RECORD 1-------------------
+ * year | 1981
+ * month | 01
+ * AVG('Adj Close) | 0.523289
+ * AVG('Adj Close) | 0.570307
+ * -RECORD 2-------------------
+ * year | 1982
+ * month | 02
+ * AVG('Adj Close) | 0.436504
+ * AVG('Adj Close) | 0.475256
+ * -RECORD 3-------------------
+ * year | 1983
+ * month | 03
+ * AVG('Adj Close) | 0.410516
+ * AVG('Adj Close) | 0.442194
+ * -RECORD 4-------------------
+ * year | 1984
+ * month | 04
+ * AVG('Adj Close) | 0.450090
+ * AVG('Adj Close) | 0.483521
+ * }}}
+ *
+ * @param numRows
+ * Number of rows to show
+ * @param truncate
+ * If set to more than 0, truncates strings to `truncate` characters and all cells will be
+ * aligned right.
+ * @param vertical
+ * If set to true, prints output rows vertically (one line per column value).
+ * @group action
+ * @since 3.4.0
+ */
+ def show(numRows: Int, truncate: Int, vertical: Boolean): Unit = {
+ val df = session.newDataset { builder =>
+ builder.getShowStringBuilder
+ .setInput(plan.getRoot)
+ .setNumRows(numRows)
+ .setTruncate(truncate)
+ .setVertical(vertical)
+ }
+ df.withResult { result =>
+ assert(result.length == 1)
+ assert(result.schema.size == 1)
+ // scalastyle:off println
+ println(result.toArray.head.getString(0))
+ // scalastyle:on println
+ }
+ }
+
+ private def buildJoin(right: Dataset[_])(f: proto.Join.Builder => Unit): DataFrame = {
+ session.newDataset { builder =>
+ val joinBuilder = builder.getJoinBuilder
+ joinBuilder.setLeft(plan.getRoot).setRight(right.plan.getRoot)
+ f(joinBuilder)
+ }
}
- private[sql] def analyze: proto.AnalyzePlanResponse = session.analyze(plan)
+ private def toJoinType(name: String): proto.Join.JoinType = {
+ name.trim.toLowerCase(Locale.ROOT) match {
+ case "inner" =>
+ proto.Join.JoinType.JOIN_TYPE_INNER
+ case "cross" =>
+ proto.Join.JoinType.JOIN_TYPE_CROSS
+ case "outer" | "full" | "fullouter" | "full_outer" =>
+ proto.Join.JoinType.JOIN_TYPE_FULL_OUTER
+ case "left" | "leftouter" | "left_outer" =>
+ proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER
+ case "right" | "rightouter" | "right_outer" =>
+ proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER
+ case "semi" | "leftsemi" | "left_semi" =>
+ proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI
+ case "anti" | "leftanti" | "left_anti" =>
+ proto.Join.JoinType.JOIN_TYPE_LEFT_ANTI
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported join type `joinType`.")
+ }
+ }
- def collectResult(): SparkResult = session.execute(plan)
+ /**
+ * Join with another `DataFrame`.
+ *
+ * Behaves as an INNER JOIN and requires a subsequent join predicate.
+ *
+ * @param right
+ * Right side of the join operation.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_]): DataFrame = buildJoin(right) { builder =>
+ builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_INNER)
+ }
+
+ /**
+ * Inner equi-join with another `DataFrame` using the given column.
+ *
+ * Different from other join functions, the join column will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * {{{
+ * // Joining df1 and df2 using the column "user_id"
+ * df1.join(df2, "user_id")
+ * }}}
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumn
+ * Name of the column to join on. This column must exist on both sides.
+ *
+ * @note
+ * If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+ * will NOT be able to reference any columns after the join, since there is no way to
+ * disambiguate which side of the join you would like to reference.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumn: String): DataFrame = {
+ join(right, Seq(usingColumn))
+ }
+
+ /**
+ * (Java-specific) Inner equi-join with another `DataFrame` using the given columns. See the
+ * Scala-specific overload for more details.
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumns
+ * Names of the columns to join on. This columns must exist on both sides.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumns: Array[String]): DataFrame = {
+ join(right, usingColumns.toSeq)
+ }
+
+ /**
+ * (Scala-specific) Inner equi-join with another `DataFrame` using the given columns.
+ *
+ * Different from other join functions, the join columns will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * {{{
+ * // Joining df1 and df2 using the columns "user_id" and "user_name"
+ * df1.join(df2, Seq("user_id", "user_name"))
+ * }}}
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumns
+ * Names of the columns to join on. This columns must exist on both sides.
+ *
+ * @note
+ * If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+ * will NOT be able to reference any columns after the join, since there is no way to
+ * disambiguate which side of the join you would like to reference.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumns: Seq[String]): DataFrame = {
+ join(right, usingColumns, "inner")
+ }
+
+ /**
+ * Equi-join with another `DataFrame` using the given column. A cross join with a predicate is
+ * specified as an inner join. If you would explicitly like to perform a cross join use the
+ * `crossJoin` method.
+ *
+ * Different from other join functions, the join column will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumn
+ * Name of the column to join on. This column must exist on both sides.
+ * @param joinType
+ * Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+ * `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+ * `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+ * `left_anti`.
+ *
+ * @note
+ * If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+ * will NOT be able to reference any columns after the join, since there is no way to
+ * disambiguate which side of the join you would like to reference.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumn: String, joinType: String): DataFrame = {
+ join(right, Seq(usingColumn), joinType)
+ }
+
+ /**
+ * (Java-specific) Equi-join with another `DataFrame` using the given columns. See the
+ * Scala-specific overload for more details.
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumns
+ * Names of the columns to join on. This columns must exist on both sides.
+ * @param joinType
+ * Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+ * `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+ * `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+ * `left_anti`.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumns: Array[String], joinType: String): DataFrame = {
+ join(right, usingColumns.toSeq, joinType)
+ }
+
+ /**
+ * (Scala-specific) Equi-join with another `DataFrame` using the given columns. A cross join
+ * with a predicate is specified as an inner join. If you would explicitly like to perform a
+ * cross join use the `crossJoin` method.
+ *
+ * Different from other join functions, the join columns will only appear once in the output,
+ * i.e. similar to SQL's `JOIN USING` syntax.
+ *
+ * @param right
+ * Right side of the join operation.
+ * @param usingColumns
+ * Names of the columns to join on. This columns must exist on both sides.
+ * @param joinType
+ * Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+ * `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+ * `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+ * `left_anti`.
+ *
+ * @note
+ * If you perform a self-join using this function without aliasing the input `DataFrame`s, you
+ * will NOT be able to reference any columns after the join, since there is no way to
+ * disambiguate which side of the join you would like to reference.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
+ buildJoin(right) { builder =>
+ builder
+ .setJoinType(toJoinType(joinType))
+ .addAllUsingColumns(usingColumns.asJava)
+ }
+ }
+
+ /**
+ * Inner join with another `DataFrame`, using the given join expression.
+ *
+ * {{{
+ * // The following two are equivalent:
+ * df1.join(df2, $"df1Key" === $"df2Key")
+ * df1.join(df2).where($"df1Key" === $"df2Key")
+ * }}}
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], joinExprs: Column): DataFrame = join(right, joinExprs, "inner")
+
+ /**
+ * Join with another `DataFrame`, using the given join expression. The following performs a full
+ * outer join between `df1` and `df2`.
+ *
+ * {{{
+ * // Scala:
+ * import org.apache.spark.sql.functions._
+ * df1.join(df2, $"df1Key" === $"df2Key", "outer")
+ *
+ * // Java:
+ * import static org.apache.spark.sql.functions.*;
+ * df1.join(df2, col("df1Key").equalTo(col("df2Key")), "outer");
+ * }}}
+ *
+ * @param right
+ * Right side of the join.
+ * @param joinExprs
+ * Join expression.
+ * @param joinType
+ * Type of join to perform. Default `inner`. Must be one of: `inner`, `cross`, `outer`,
+ * `full`, `fullouter`, `full_outer`, `left`, `leftouter`, `left_outer`, `right`,
+ * `rightouter`, `right_outer`, `semi`, `leftsemi`, `left_semi`, `anti`, `leftanti`,
+ * `left_anti`.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = {
+ buildJoin(right) { builder =>
+ builder
+ .setJoinType(toJoinType(joinType))
+ .setJoinCondition(joinExprs.expr)
+ }
+ }
+
+ /**
+ * Explicit cartesian join with another `DataFrame`.
+ *
+ * @param right
+ * Right side of the join operation.
+ *
+ * @note
+ * Cartesian joins are very expensive without an extra filter that can be pushed down.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def crossJoin(right: Dataset[_]): DataFrame = buildJoin(right) { builder =>
+ builder.setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS)
+ }
+
+ private def buildSort(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
+ session.newDataset { builder =>
+ builder.getSortBuilder
+ .setInput(plan.getRoot)
+ .setIsGlobal(false)
+ .addAllOrder(sortExprs.map(_.sortOrder).asJava)
+ }
+ }
+
+ /**
+ * Returns a new Dataset with each partition sorted by the given expressions.
+ *
+ * This is the same operation as "SORT BY" in SQL (Hive QL).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] = {
+ sortWithinPartitions((sortCol +: sortCols).map(Column(_)): _*)
+ }
+
+ /**
+ * Returns a new Dataset with each partition sorted by the given expressions.
+ *
+ * This is the same operation as "SORT BY" in SQL (Hive QL).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def sortWithinPartitions(sortExprs: Column*): Dataset[T] = {
+ buildSort(global = false, sortExprs)
+ }
+
+ /**
+ * Returns a new Dataset sorted by the specified column, all in ascending order.
+ * {{{
+ * // The following 3 are equivalent
+ * ds.sort("sortcol")
+ * ds.sort($"sortcol")
+ * ds.sort($"sortcol".asc)
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def sort(sortCol: String, sortCols: String*): Dataset[T] = {
+ sort((sortCol +: sortCols).map(Column(_)): _*)
+ }
+
+ /**
+ * Returns a new Dataset sorted by the given expressions. For example:
+ * {{{
+ * ds.sort($"col1", $"col2".desc)
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def sort(sortExprs: Column*): Dataset[T] = {
+ buildSort(global = true, sortExprs)
+ }
+
+ /**
+ * Returns a new Dataset sorted by the given expressions. This is an alias of the `sort`
+ * function.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(sortCol: String, sortCols: String*): Dataset[T] = sort(sortCol, sortCols: _*)
+
+ /**
+ * Returns a new Dataset sorted by the given expressions. This is an alias of the `sort`
+ * function.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def orderBy(sortExprs: Column*): Dataset[T] = sort(sortExprs: _*)
+
+ /**
+ * Selects column based on the column name and returns it as a [[Column]].
+ *
+ * @note
+ * The column name can also reference to a nested column like `a.b`.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def apply(colName: String): Column = col(colName)
+
+ /**
+ * Specifies some hint on the current Dataset. As an example, the following code specifies that
+ * one of the plan can be broadcasted:
+ *
+ * {{{
+ * df1.join(df2.hint("broadcast"))
+ * }}}
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def hint(name: String, parameters: Any*): Dataset[T] = session.newDataset { builder =>
+ builder.getHintBuilder
+ .setInput(plan.getRoot)
+ .setName(name)
+ .addAllParameters(parameters.map(p => functions.lit(p).expr).asJava)
+ }
+
+ /**
+ * Selects column based on the column name and returns it as a [[Column]].
+ *
+ * @note
+ * The column name can also reference to a nested column like `a.b`.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def col(colName: String): Column = functions.col(colName)
+
+ /**
+ * Selects column based on the column name specified as a regex and returns it as [[Column]].
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def colRegex(colName: String): Column = Column { builder =>
+ builder.getUnresolvedRegexBuilder.setColName(colName)
+ }
+
+ /**
+ * Returns a new Dataset with an alias set.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def as(alias: String): Dataset[T] = session.newDataset { builder =>
+ builder.getSubqueryAliasBuilder
+ .setInput(plan.getRoot)
+ .setAlias(alias)
+ }
+
+ /**
+ * (Scala-specific) Returns a new Dataset with an alias set.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def as(alias: Symbol): Dataset[T] = as(alias.name)
+
+ /**
+ * Returns a new Dataset with an alias set. Same as `as`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def alias(alias: String): Dataset[T] = as(alias)
+
+ /**
+ * (Scala-specific) Returns a new Dataset with an alias set. Same as `as`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def alias(alias: Symbol): Dataset[T] = as(alias)
+
+ /**
+ * Selects a set of column based expressions.
+ * {{{
+ * ds.select($"colA", $"colB" + 1)
+ * }}}
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def select(cols: Column*): DataFrame = session.newDataset { builder =>
+ builder.getProjectBuilder
+ .setInput(plan.getRoot)
+ .addAllExpressions(cols.map(_.expr).asJava)
+ }
+
+ /**
+ * Selects a set of columns. This is a variant of `select` that can only select existing columns
+ * using column names (i.e. cannot construct expressions).
+ *
+ * {{{
+ * // The following two are equivalent:
+ * ds.select("colA", "colB")
+ * ds.select($"colA", $"colB")
+ * }}}
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def select(col: String, cols: String*): DataFrame = select((col +: cols).map(Column(_)): _*)
+
+ /**
+ * Selects a set of SQL expressions. This is a variant of `select` that accepts SQL expressions.
+ *
+ * {{{
+ * // The following are equivalent:
+ * ds.selectExpr("colA", "colB as newName", "abs(colC)")
+ * ds.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
+ * }}}
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def selectExpr(exprs: String*): DataFrame = {
+ select(exprs.map(functions.expr): _*)
+ }
+
+ /**
+ * Filters rows using the given condition.
+ * {{{
+ * // The following are equivalent:
+ * peopleDs.filter($"age" > 15)
+ * peopleDs.where($"age" > 15)
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def filter(condition: Column): Dataset[T] = session.newDataset { builder =>
+ builder.getFilterBuilder.setInput(plan.getRoot).setCondition(condition.expr)
+ }
+
+ /**
+ * Filters rows using the given SQL expression.
+ * {{{
+ * peopleDs.filter("age > 15")
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def filter(conditionExpr: String): Dataset[T] = filter(functions.expr(conditionExpr))
+
+ /**
+ * Filters rows using the given condition. This is an alias for `filter`.
+ * {{{
+ * // The following are equivalent:
+ * peopleDs.filter($"age" > 15)
+ * peopleDs.where($"age" > 15)
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def where(condition: Column): Dataset[T] = filter(condition)
+
+ /**
+ * Filters rows using the given SQL expression.
+ * {{{
+ * peopleDs.where("age > 15")
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def where(conditionExpr: String): Dataset[T] = filter(conditionExpr)
+
+ private def buildUnpivot(
+ ids: Array[Column],
+ valuesOption: Option[Array[Column]],
+ variableColumnName: String,
+ valueColumnName: String): DataFrame = session.newDataset { builder =>
+ val unpivot = builder.getUnpivotBuilder
+ .setInput(plan.getRoot)
+ .addAllIds(ids.toSeq.map(_.expr).asJava)
+ .setValueColumnName(variableColumnName)
+ .setValueColumnName(valueColumnName)
+ valuesOption.foreach { values =>
+ unpivot.getValuesBuilder
+ .addAllValues(values.toSeq.map(_.expr).asJava)
+ }
+ }
+
+ /**
+ * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+ * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+ * which cannot be reversed.
+ *
+ * This function is useful to massage a DataFrame into a format where some columns are
+ * identifier columns ("ids"), while all other columns ("values") are "unpivoted" to the rows,
+ * leaving just two non-id columns, named as given by `variableColumnName` and
+ * `valueColumnName`.
+ *
+ * {{{
+ * val df = Seq((1, 11, 12L), (2, 21, 22L)).toDF("id", "int", "long")
+ * df.show()
+ * // output:
+ * // +---+---+----+
+ * // | id|int|long|
+ * // +---+---+----+
+ * // | 1| 11| 12|
+ * // | 2| 21| 22|
+ * // +---+---+----+
+ *
+ * df.unpivot(Array($"id"), Array($"int", $"long"), "variable", "value").show()
+ * // output:
+ * // +---+--------+-----+
+ * // | id|variable|value|
+ * // +---+--------+-----+
+ * // | 1| int| 11|
+ * // | 1| long| 12|
+ * // | 2| int| 21|
+ * // | 2| long| 22|
+ * // +---+--------+-----+
+ * // schema:
+ * //root
+ * // |-- id: integer (nullable = false)
+ * // |-- variable: string (nullable = false)
+ * // |-- value: long (nullable = true)
+ * }}}
+ *
+ * When no "id" columns are given, the unpivoted DataFrame consists of only the "variable" and
+ * "value" columns.
+ *
+ * All "value" columns must share a least common data type. Unless they are the same data type,
+ * all "value" columns are cast to the nearest common data type. For instance, types
+ * `IntegerType` and `LongType` are cast to `LongType`, while `IntegerType` and `StringType` do
+ * not have a common data type and `unpivot` fails with an `AnalysisException`.
+ *
+ * @param ids
+ * Id columns
+ * @param values
+ * Value columns to unpivot
+ * @param variableColumnName
+ * Name of the variable column
+ * @param valueColumnName
+ * Name of the value column
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def unpivot(
+ ids: Array[Column],
+ values: Array[Column],
+ variableColumnName: String,
+ valueColumnName: String): DataFrame = {
+ buildUnpivot(ids, Option(values), variableColumnName, valueColumnName)
+ }
+
+ /**
+ * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+ * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+ * which cannot be reversed.
+ *
+ * @see
+ * `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+ *
+ * This is equivalent to calling `Dataset#unpivot(Array, Array, String, String)` where `values`
+ * is set to all non-id columns that exist in the DataFrame.
+ *
+ * @param ids
+ * Id columns
+ * @param variableColumnName
+ * Name of the variable column
+ * @param valueColumnName
+ * Name of the value column
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def unpivot(
+ ids: Array[Column],
+ variableColumnName: String,
+ valueColumnName: String): DataFrame = {
+ buildUnpivot(ids, None, variableColumnName, valueColumnName)
+ }
+
+ /**
+ * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+ * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+ * which cannot be reversed. This is an alias for `unpivot`.
+ *
+ * @see
+ * `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+ *
+ * @param ids
+ * Id columns
+ * @param values
+ * Value columns to unpivot
+ * @param variableColumnName
+ * Name of the variable column
+ * @param valueColumnName
+ * Name of the value column
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def melt(
+ ids: Array[Column],
+ values: Array[Column],
+ variableColumnName: String,
+ valueColumnName: String): DataFrame =
+ unpivot(ids, values, variableColumnName, valueColumnName)
+
+ /**
+ * Unpivot a DataFrame from wide format to long format, optionally leaving identifier columns
+ * set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except for the aggregation,
+ * which cannot be reversed. This is an alias for `unpivot`.
+ *
+ * @see
+ * `org.apache.spark.sql.Dataset.unpivot(Array, Array, String, String)`
+ *
+ * This is equivalent to calling `Dataset#unpivot(Array, Array, String, String)` where `values`
+ * is set to all non-id columns that exist in the DataFrame.
+ *
+ * @param ids
+ * Id columns
+ * @param variableColumnName
+ * Name of the variable column
+ * @param valueColumnName
+ * Name of the value column
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def melt(ids: Array[Column], variableColumnName: String, valueColumnName: String): DataFrame =
+ unpivot(ids, variableColumnName, valueColumnName)
+
+ /**
+ * Returns a new Dataset by taking the first `n` rows. The difference between this function and
+ * `head` is that `head` is an action and returns an array (by triggering query execution) while
+ * `limit` returns a new Dataset.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def limit(n: Int): Dataset[T] = session.newDataset { builder =>
+ builder.getLimitBuilder
+ .setInput(plan.getRoot)
+ .setLimit(n)
+ }
+
+ /**
+ * Returns a new Dataset by skipping the first `n` rows.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def offset(n: Int): Dataset[T] = session.newDataset { builder =>
+ builder.getOffsetBuilder
+ .setInput(plan.getRoot)
+ .setOffset(n)
+ }
+
+ private def buildSetOp(right: Dataset[T], setOpType: proto.SetOperation.SetOpType)(
+ f: proto.SetOperation.Builder => Unit): Dataset[T] = {
+ session.newDataset { builder =>
+ f(
+ builder.getSetOpBuilder
+ .setSetOpType(setOpType)
+ .setLeftInput(plan.getRoot)
+ .setRightInput(right.plan.getRoot))
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+ *
+ * This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union (that does
+ * deduplication of elements), use this function followed by a [[distinct]].
+ *
+ * Also as standard in SQL, this function resolves columns by position (not by name):
+ *
+ * {{{
+ * val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+ * val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+ * df1.union(df2).show
+ *
+ * // output:
+ * // +----+----+----+
+ * // |col0|col1|col2|
+ * // +----+----+----+
+ * // | 1| 2| 3|
+ * // | 4| 5| 6|
+ * // +----+----+----+
+ * }}}
+ *
+ * Notice that the column positions in the schema aren't necessarily matched with the fields in
+ * the strongly typed objects in a Dataset. This function resolves columns by their positions in
+ * the schema, not the fields in the strongly typed objects. Use [[unionByName]] to resolve
+ * columns by field name in the typed objects.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def union(other: Dataset[T]): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_UNION) { builder =>
+ builder.setIsAll(true)
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing union of rows in this Dataset and another Dataset. This is
+ * an alias for `union`.
+ *
+ * This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union (that does
+ * deduplication of elements), use this function followed by a [[distinct]].
+ *
+ * Also as standard in SQL, this function resolves columns by position (not by name).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def unionAll(other: Dataset[T]): Dataset[T] = union(other)
+
+ /**
+ * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+ *
+ * This is different from both `UNION ALL` and `UNION DISTINCT` in SQL. To do a SQL-style set
+ * union (that does deduplication of elements), use this function followed by a [[distinct]].
+ *
+ * The difference between this function and [[union]] is that this function resolves columns by
+ * name (not by position):
+ *
+ * {{{
+ * val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+ * val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
+ * df1.unionByName(df2).show
+ *
+ * // output:
+ * // +----+----+----+
+ * // |col0|col1|col2|
+ * // +----+----+----+
+ * // | 1| 2| 3|
+ * // | 6| 4| 5|
+ * // +----+----+----+
+ * }}}
+ *
+ * Note that this supports nested columns in struct and array types. Nested columns in map types
+ * are not currently supported.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def unionByName(other: Dataset[T]): Dataset[T] = unionByName(other, allowMissingColumns = false)
+
+ /**
+ * Returns a new Dataset containing union of rows in this Dataset and another Dataset.
+ *
+ * The difference between this function and [[union]] is that this function resolves columns by
+ * name (not by position).
+ *
+ * When the parameter `allowMissingColumns` is `true`, the set of column names in this and other
+ * `Dataset` can differ; missing columns will be filled with null. Further, the missing columns
+ * of this `Dataset` will be added at the end in the schema of the union result:
+ *
+ * {{{
+ * val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
+ * val df2 = Seq((4, 5, 6)).toDF("col1", "col0", "col3")
+ * df1.unionByName(df2, true).show
+ *
+ * // output: "col3" is missing at left df1 and added at the end of schema.
+ * // +----+----+----+----+
+ * // |col0|col1|col2|col3|
+ * // +----+----+----+----+
+ * // | 1| 2| 3|null|
+ * // | 5| 4|null| 6|
+ * // +----+----+----+----+
+ *
+ * df2.unionByName(df1, true).show
+ *
+ * // output: "col2" is missing at left df2 and added at the end of schema.
+ * // +----+----+----+----+
+ * // |col1|col0|col3|col2|
+ * // +----+----+----+----+
+ * // | 4| 5| 6|null|
+ * // | 2| 1|null| 3|
+ * // +----+----+----+----+
+ * }}}
+ *
+ * Note that this supports nested columns in struct and array types. With `allowMissingColumns`,
+ * missing nested columns of struct columns with the same name will also be filled with null
+ * values and added to the end of struct. Nested columns in map types are not currently
+ * supported.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_UNION) { builder =>
+ builder.setByName(true).setIsAll(true).setAllowMissingColumns(allowMissingColumns)
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing rows only in both this Dataset and another Dataset. This is
+ * equivalent to `INTERSECT` in SQL.
+ *
+ * @note
+ * Equality checking is performed directly on the encoded representation of the data and thus
+ * is not affected by a custom `equals` function defined on `T`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def intersect(other: Dataset[T]): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT) { builder =>
+ builder.setIsAll(false)
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing rows only in both this Dataset and another Dataset while
+ * preserving the duplicates. This is equivalent to `INTERSECT ALL` in SQL.
+ *
+ * @note
+ * Equality checking is performed directly on the encoded representation of the data and thus
+ * is not affected by a custom `equals` function defined on `T`. Also as standard in SQL, this
+ * function resolves columns by position (not by name).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def intersectAll(other: Dataset[T]): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_INTERSECT) { builder =>
+ builder.setIsAll(true)
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing rows in this Dataset but not in another Dataset. This is
+ * equivalent to `EXCEPT DISTINCT` in SQL.
+ *
+ * @note
+ * Equality checking is performed directly on the encoded representation of the data and thus
+ * is not affected by a custom `equals` function defined on `T`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def except(other: Dataset[T]): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT) { builder =>
+ builder.setIsAll(false)
+ }
+ }
+
+ /**
+ * Returns a new Dataset containing rows in this Dataset but not in another Dataset while
+ * preserving the duplicates. This is equivalent to `EXCEPT ALL` in SQL.
+ *
+ * @note
+ * Equality checking is performed directly on the encoded representation of the data and thus
+ * is not affected by a custom `equals` function defined on `T`. Also as standard in SQL, this
+ * function resolves columns by position (not by name).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def exceptAll(other: Dataset[T]): Dataset[T] = {
+ buildSetOp(other, proto.SetOperation.SetOpType.SET_OP_TYPE_EXCEPT) { builder =>
+ builder.setIsAll(true)
+ }
+ }
+
+ /**
+ * Returns a new [[Dataset]] by sampling a fraction of rows (without replacement), using a
+ * user-supplied seed.
+ *
+ * @param fraction
+ * Fraction of rows to generate, range [0.0, 1.0].
+ * @param seed
+ * Seed for sampling.
+ *
+ * @note
+ * This is NOT guaranteed to provide exactly the fraction of the count of the given
+ * [[Dataset]].
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def sample(fraction: Double, seed: Long): Dataset[T] = {
+ sample(withReplacement = false, fraction = fraction, seed = seed)
+ }
+
+ /**
+ * Returns a new [[Dataset]] by sampling a fraction of rows (without replacement), using a
+ * random seed.
+ *
+ * @param fraction
+ * Fraction of rows to generate, range [0.0, 1.0].
+ *
+ * @note
+ * This is NOT guaranteed to provide exactly the fraction of the count of the given
+ * [[Dataset]].
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def sample(fraction: Double): Dataset[T] = {
+ sample(withReplacement = false, fraction = fraction)
+ }
+
+ /**
+ * Returns a new [[Dataset]] by sampling a fraction of rows, using a user-supplied seed.
+ *
+ * @param withReplacement
+ * Sample with replacement or not.
+ * @param fraction
+ * Fraction of rows to generate, range [0.0, 1.0].
+ * @param seed
+ * Seed for sampling.
+ *
+ * @note
+ * This is NOT guaranteed to provide exactly the fraction of the count of the given
+ * [[Dataset]].
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] = {
+ session.newDataset { builder =>
+ builder.getSampleBuilder
+ .setInput(plan.getRoot)
+ .setWithReplacement(withReplacement)
+ .setLowerBound(0.0d)
+ .setUpperBound(fraction)
+ .setSeed(seed)
+ }
+ }
+
+ /**
+ * Returns a new [[Dataset]] by sampling a fraction of rows, using a random seed.
+ *
+ * @param withReplacement
+ * Sample with replacement or not.
+ * @param fraction
+ * Fraction of rows to generate, range [0.0, 1.0].
+ *
+ * @note
+ * This is NOT guaranteed to provide exactly the fraction of the total count of the given
+ * [[Dataset]].
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def sample(withReplacement: Boolean, fraction: Double): Dataset[T] = {
+ sample(withReplacement, fraction, Utils.random.nextLong)
+ }
+
+ /**
+ * Randomly splits this Dataset with the provided weights.
+ *
+ * @param weights
+ * weights for splits, will be normalized if they don't sum to 1.
+ * @param seed
+ * Seed for sampling.
+ *
+ * For Java API, use [[randomSplitAsList]].
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] = {
+ require(
+ weights.forall(_ >= 0),
+ s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}")
+ require(
+ weights.sum > 0,
+ s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}")
+
+ // It is possible that the underlying dataframe doesn't guarantee the ordering of rows in its
+ // constituent partitions each time a split is materialized which could result in
+ // overlapping splits. To prevent this, we explicitly sort each input partition to make the
+ // ordering deterministic. Note that MapTypes cannot be sorted and are explicitly pruned out
+ // from the sort order.
+ // TODO we need to have a proper way of stabilizing the input data. The current approach does
+ // not work well with spark connects' extremely lazy nature. When the schema is modified
+ // between construction and execution the query might fail or produce wrong results. Another
+ // problem can come from data that arrives between the execution of the returned datasets.
+ val sortOrder = schema.collect {
+ case f if RowOrdering.isOrderable(f.dataType) => col(f.name).asc
+ }
+ val sortedInput = sortWithinPartitions(sortOrder: _*).plan.getRoot
+ val sum = weights.sum
+ val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
+ normalizedCumWeights
+ .sliding(2)
+ .map { case Array(low, high) =>
+ session.newDataset[T] { builder =>
+ builder.getSampleBuilder
+ .setInput(sortedInput)
+ .setWithReplacement(false)
+ .setLowerBound(low)
+ .setUpperBound(high)
+ .setSeed(seed)
+ }
+ }
+ .toArray
+ }
+
+ /**
+ * Returns a Java list that contains randomly split Dataset with the provided weights.
+ *
+ * @param weights
+ * weights for splits, will be normalized if they don't sum to 1.
+ * @param seed
+ * Seed for sampling.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def randomSplitAsList(weights: Array[Double], seed: Long): java.util.List[Dataset[T]] = {
+ val values = randomSplit(weights, seed)
+ java.util.Arrays.asList(values: _*)
+ }
+
+ /**
+ * Randomly splits this Dataset with the provided weights.
+ *
+ * @param weights
+ * weights for splits, will be normalized if they don't sum to 1.
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def randomSplit(weights: Array[Double]): Array[Dataset[T]] = {
+ randomSplit(weights, Utils.random.nextLong)
+ }
+
+ private def withColumns(names: Seq[String], values: Seq[Column]): DataFrame = {
+ val aliases = values.zip(names).map { case (value, name) =>
+ value.name(name).expr.getAlias
+ }
+ session.newDataset { builder =>
+ builder.getWithColumnsBuilder
+ .setInput(plan.getRoot)
+ .addAllAliases(aliases.asJava)
+ }
+ }
+
+ /**
+ * Returns a new Dataset by adding a column or replacing the existing column that has the same
+ * name.
+ *
+ * `column`'s expression must only refer to attributes supplied by this Dataset. It is an error
+ * to add a column that refers to some other Dataset.
+ *
+ * @note
+ * this method introduces a projection internally. Therefore, calling it multiple times, for
+ * instance, via loops in order to add multiple columns can generate big plans which can cause
+ * performance issues and even `StackOverflowException`. To avoid this, use `select` with the
+ * multiple columns at once.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
+
+ /**
+ * (Scala-specific) Returns a new Dataset by adding columns or replacing the existing columns
+ * that has the same names.
+ *
+ * `colsMap` is a map of column name and column, the column must only refer to attributes
+ * supplied by this Dataset. It is an error to add columns that refers to some other Dataset.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withColumns(colsMap: Map[String, Column]): DataFrame = {
+ val (colNames, newCols) = colsMap.toSeq.unzip
+ withColumns(colNames, newCols)
+ }
+
+ /**
+ * (Java-specific) Returns a new Dataset by adding columns or replacing the existing columns
+ * that has the same names.
+ *
+ * `colsMap` is a map of column name and column, the column must only refer to attribute
+ * supplied by this Dataset. It is an error to add columns that refers to some other Dataset.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withColumns(colsMap: java.util.Map[String, Column]): DataFrame = withColumns(
+ colsMap.asScala.toMap)
+
+ /**
+ * Returns a new Dataset with a column renamed. This is a no-op if schema doesn't contain
+ * existingName.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withColumnRenamed(existingName: String, newName: String): DataFrame = {
+ withColumnsRenamed(Collections.singletonMap(existingName, newName))
+ }
+
+ /**
+ * (Scala-specific) Returns a new Dataset with a columns renamed. This is a no-op if schema
+ * doesn't contain existingName.
+ *
+ * `colsMap` is a map of existing column name and new column name.
+ *
+ * @throws AnalysisException
+ * if there are duplicate names in resulting projection
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @throws[AnalysisException]
+ def withColumnsRenamed(colsMap: Map[String, String]): DataFrame = {
+ withColumnsRenamed(colsMap.asJava)
+ }
+
+ /**
+ * (Java-specific) Returns a new Dataset with a columns renamed. This is a no-op if schema
+ * doesn't contain existingName.
+ *
+ * `colsMap` is a map of existing column name and new column name.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withColumnsRenamed(colsMap: java.util.Map[String, String]): DataFrame = {
+ session.newDataset { builder =>
+ builder.getWithColumnsRenamedBuilder
+ .setInput(plan.getRoot)
+ .putAllRenameColumnsMap(colsMap)
+ }
+ }
+
+ /**
+ * Returns a new Dataset by updating an existing column with metadata.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def withMetadata(columnName: String, metadata: Metadata): DataFrame = {
+ val newAlias = proto.Expression.Alias
+ .newBuilder()
+ .setExpr(col(columnName).expr)
+ .addName(columnName)
+ .setMetadata(metadata.json)
+ session.newDataset { builder =>
+ builder.getWithColumnsBuilder
+ .setInput(plan.getRoot)
+ .addAliases(newAlias)
+ }
+ }
+
+ /**
+ * Returns a new Dataset with a column dropped. This is a no-op if schema doesn't contain column
+ * name.
+ *
+ * This method can only be used to drop top level columns. the colName string is treated
+ * literally without further interpretation.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def drop(colName: String): DataFrame = {
+ drop(functions.col(colName))
+ }
+
+ /**
+ * Returns a new Dataset with columns dropped. This is a no-op if schema doesn't contain column
+ * name(s).
+ *
+ * This method can only be used to drop top level columns. the colName string is treated
+ * literally without further interpretation.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def drop(colNames: String*): DataFrame = buildDrop(colNames.map(functions.col))
+
+ /**
+ * Returns a new Dataset with column dropped.
+ *
+ * This method can only be used to drop top level column. This version of drop accepts a
+ * [[Column]] rather than a name. This is a no-op if the Dataset doesn't have a column with an
+ * equivalent expression.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ def drop(col: Column): DataFrame = {
+ buildDrop(col :: Nil)
+ }
+
+ /**
+ * Returns a new Dataset with columns dropped.
+ *
+ * This method can only be used to drop top level columns. This is a no-op if the Dataset
+ * doesn't have a columns with an equivalent expression.
+ *
+ * @group untypedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def drop(col: Column, cols: Column*): DataFrame = buildDrop(col +: cols)
+
+ private def buildDrop(cols: Seq[Column]): DataFrame = session.newDataset { builder =>
+ builder.getDropBuilder
+ .setInput(plan.getRoot)
+ .addAllCols(cols.map(_.expr).asJava)
+ }
+
+ /**
+ * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias
+ * for `distinct`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def dropDuplicates(): Dataset[T] = session.newDataset { builder =>
+ builder.getDeduplicateBuilder
+ .setInput(plan.getRoot)
+ .setAllColumnsAsKeys(true)
+ }
+
+ /**
+ * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the
+ * subset of columns.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def dropDuplicates(colNames: Seq[String]): Dataset[T] = session.newDataset { builder =>
+ builder.getDeduplicateBuilder
+ .setInput(plan.getRoot)
+ .addAllColumnNames(colNames.asJava)
+ }
+
+ /**
+ * Returns a new Dataset with duplicate rows removed, considering only the subset of columns.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
+
+ /**
+ * Returns a new [[Dataset]] with duplicate rows removed, considering only the subset of
+ * columns.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
+ val colNames: Seq[String] = col1 +: cols
+ dropDuplicates(colNames)
+ }
+
+ /**
+ * Computes basic statistics for numeric and string columns, including count, mean, stddev, min,
+ * and max. If no columns are given, this function computes statistics for all numerical or
+ * string columns.
+ *
+ * This function is meant for exploratory data analysis, as we make no guarantee about the
+ * backward compatibility of the schema of the resulting Dataset. If you want to
+ * programmatically compute summary statistics, use the `agg` function instead.
+ *
+ * {{{
+ * ds.describe("age", "height").show()
+ *
+ * // output:
+ * // summary age height
+ * // count 10.0 10.0
+ * // mean 53.3 178.05
+ * // stddev 11.6 15.7
+ * // min 18.0 163.0
+ * // max 92.0 192.0
+ * }}}
+ *
+ * Use [[summary]] for expanded statistics and control over which statistics to compute.
+ *
+ * @param cols
+ * Columns to compute statistics on.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def describe(cols: String*): DataFrame = session.newDataset { builder =>
+ builder.getDescribeBuilder
+ .setInput(plan.getRoot)
+ .addAllCols(cols.asJava)
+ }
+
+ /**
+ * Computes specified statistics for numeric and string columns. Available statistics are: <ul>
+ * <li>count</li> <li>mean</li> <li>stddev</li> <li>min</li> <li>max</li> <li>arbitrary
+ * approximate percentiles specified as a percentage (e.g. 75%)</li> <li>count_distinct</li>
+ * <li>approx_count_distinct</li> </ul>
+ *
+ * If no statistics are given, this function computes count, mean, stddev, min, approximate
+ * quartiles (percentiles at 25%, 50%, and 75%), and max.
+ *
+ * This function is meant for exploratory data analysis, as we make no guarantee about the
+ * backward compatibility of the schema of the resulting Dataset. If you want to
+ * programmatically compute summary statistics, use the `agg` function instead.
+ *
+ * {{{
+ * ds.summary().show()
+ *
+ * // output:
+ * // summary age height
+ * // count 10.0 10.0
+ * // mean 53.3 178.05
+ * // stddev 11.6 15.7
+ * // min 18.0 163.0
+ * // 25% 24.0 176.0
+ * // 50% 24.0 176.0
+ * // 75% 32.0 180.0
+ * // max 92.0 192.0
+ * }}}
+ *
+ * {{{
+ * ds.summary("count", "min", "25%", "75%", "max").show()
+ *
+ * // output:
+ * // summary age height
+ * // count 10.0 10.0
+ * // min 18.0 163.0
+ * // 25% 24.0 176.0
+ * // 75% 32.0 180.0
+ * // max 92.0 192.0
+ * }}}
+ *
+ * To do a summary for specific columns first select them:
+ *
+ * {{{
+ * ds.select("age", "height").summary().show()
+ * }}}
+ *
+ * Specify statistics to output custom summaries:
+ *
+ * {{{
+ * ds.summary("count", "count_distinct").show()
+ * }}}
+ *
+ * The distinct count isn't included by default.
+ *
+ * You can also run approximate distinct counts which are faster:
+ *
+ * {{{
+ * ds.summary("count", "approx_count_distinct").show()
+ * }}}
+ *
+ * See also [[describe]] for basic statistics.
+ *
+ * @param statistics
+ * Statistics from above list to be computed.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def summary(statistics: String*): DataFrame = session.newDataset { builder =>
+ builder.getSummaryBuilder
+ .setInput(plan.getRoot)
+ .addAllStatistics(statistics.asJava)
+ }
+
+ /**
+ * Returns the first `n` rows.
+ *
+ * @note
+ * this method should only be used if the resulting array is expected to be small, as all the
+ * data is loaded into the driver's memory.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def head(n: Int): Array[T] = limit(n).collect()
+
+ /**
+ * Returns the first row.
+ * @group action
+ * @since 3.4.0
+ */
+ def head(): T = head(1).head
+
+ /**
+ * Returns the first row. Alias for head().
+ * @group action
+ * @since 3.4.0
+ */
+ def first(): T = head()
+
+ /**
+ * Concise syntax for chaining custom transformations.
+ * {{{
+ * def featurize(ds: Dataset[T]): Dataset[U] = ...
+ *
+ * ds
+ * .transform(featurize)
+ * .transform(...)
+ * }}}
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] = t(this)
+
+ /**
+ * Returns the first `n` rows in the Dataset.
+ *
+ * Running take requires moving data into the application's driver process, and doing so with a
+ * very large `n` can crash the driver process with OutOfMemoryError.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def take(n: Int): Array[T] = head(n)
+
+ /**
+ * Returns the last `n` rows in the Dataset.
+ *
+ * Running tail requires moving data into the application's driver process, and doing so with a
+ * very large `n` can crash the driver process with OutOfMemoryError.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def tail(n: Int): Array[T] = {
+ val lastN = session.newDataset[T] { builder =>
+ builder.getTailBuilder
+ .setInput(plan.getRoot)
+ .setLimit(n)
+ }
+ lastN.collect()
+ }
+
+ /**
+ * Returns the first `n` rows in the Dataset as a list.
+ *
+ * Running take requires moving data into the application's driver process, and doing so with a
+ * very large `n` can crash the driver process with OutOfMemoryError.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n): _*)
+
+ /**
+ * Returns an array that contains all rows in this Dataset.
+ *
+ * Running collect requires moving all the data into the application's driver process, and doing
+ * so on a very large dataset can crash the driver process with OutOfMemoryError.
+ *
+ * For Java API, use [[collectAsList]].
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def collect(): Array[T] = withResult { result =>
+ result.toArray.asInstanceOf[Array[T]]
+ }
+
+ /**
+ * Returns a Java list that contains all rows in this Dataset.
+ *
+ * Running collect requires moving all the data into the application's driver process, and doing
+ * so on a very large dataset can crash the driver process with OutOfMemoryError.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def collectAsList(): java.util.List[T] = {
+ java.util.Arrays.asList(collect(): _*)
+ }
+
+ /**
+ * Returns an iterator that contains all rows in this Dataset.
+ *
+ * The returned iterator implements [[AutoCloseable]]. For memory management it is better to
+ * close it once you are done. If you don't close it, it and the underlying data will be cleaned
+ * up once the iterator is garbage collected.
+ *
+ * @group action
+ * @since 3.4.0
+ */
+ def toLocalIterator(): java.util.Iterator[T] = {
+ // TODO make this a destructive iterator.
+ collectResult().iterator.asInstanceOf[java.util.Iterator[T]]
+ }
+
+ private def buildRepartition(numPartitions: Int, shuffle: Boolean): Dataset[T] = {
+ session.newDataset { builder =>
+ builder.getRepartitionBuilder
+ .setInput(plan.getRoot)
+ .setNumPartitions(numPartitions)
+ .setShuffle(shuffle)
+ }
+ }
+
+ private def buildRepartitionByExpression(
+ numPartitions: Option[Int],
+ partitionExprs: Seq[Column]): Dataset[T] = session.newDataset { builder =>
+ val repartitionBuilder = builder.getRepartitionByExpressionBuilder
+ .setInput(plan.getRoot)
+ .addAllPartitionExprs(partitionExprs.map(_.expr).asJava)
+ numPartitions.foreach(repartitionBuilder.setNumPartitions)
+ }
+
+ /**
+ * Returns a new Dataset that has exactly `numPartitions` partitions.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def repartition(numPartitions: Int): Dataset[T] = {
+ buildRepartition(numPartitions, shuffle = true)
+ }
+
+ private def repartitionByExpression(
+ numPartitions: Option[Int],
+ partitionExprs: Seq[Column]): Dataset[T] = {
+ // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments.
+ // However, we don't want to complicate the semantics of this API method.
+ // Instead, let's give users a friendly error message, pointing them to the new method.
+ val sortOrders = partitionExprs.filter(_.expr.hasSortOrder)
+ if (sortOrders.nonEmpty) {
+ throw new IllegalArgumentException(
+ s"Invalid partitionExprs specified: $sortOrders\n" +
+ s"For range partitioning use repartitionByRange(...) instead.")
+ }
+ buildRepartitionByExpression(numPartitions, partitionExprs)
+ }
+
+ /**
+ * Returns a new Dataset partitioned by the given partitioning expressions into `numPartitions`.
+ * The resulting Dataset is hash partitioned.
+ *
+ * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
+ repartitionByExpression(Some(numPartitions), partitionExprs)
+ }
+
+ /**
+ * Returns a new Dataset partitioned by the given partitioning expressions, using
+ * `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is hash
+ * partitioned.
+ *
+ * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+ *
+ * @group typedrel
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def repartition(partitionExprs: Column*): Dataset[T] = {
+ repartitionByExpression(None, partitionExprs)
+ }
+
+ private def repartitionByRange(
+ numPartitions: Option[Int],
+ partitionExprs: Seq[Column]): Dataset[T] = {
+ require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.")
+ val sortExprs = partitionExprs.map {
+ case e if e.expr.hasSortOrder => e
+ case e => e.asc
+ }
+ buildRepartitionByExpression(numPartitions, sortExprs)
+ }
+
+ /**
+ * Returns a new Dataset partitioned by the given partitioning expressions into `numPartitions`.
+ * The resulting Dataset is range partitioned.
+ *
+ * At least one partition-by expression must be specified. When no explicit sort order is
+ * specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each
+ * partition of the resulting Dataset.
+ *
+ * Note that due to performance reasons this method uses sampling to estimate the ranges. Hence,
+ * the output may not be consistent, since sampling can return different values. The sample size
+ * can be controlled by the config `spark.sql.execution.rangeExchange.sampleSizePerPartition`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
+ repartitionByRange(Some(numPartitions), partitionExprs)
+ }
+
+ /**
+ * Returns a new Dataset partitioned by the given partitioning expressions, using
+ * `spark.sql.shuffle.partitions` as number of partitions. The resulting Dataset is range
+ * partitioned.
+ *
+ * At least one partition-by expression must be specified. When no explicit sort order is
+ * specified, "ascending nulls first" is assumed. Note, the rows are not sorted in each
+ * partition of the resulting Dataset.
+ *
+ * Note that due to performance reasons this method uses sampling to estimate the ranges. Hence,
+ * the output may not be consistent, since sampling can return different values. The sample size
+ * can be controlled by the config `spark.sql.execution.rangeExchange.sampleSizePerPartition`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ @scala.annotation.varargs
+ def repartitionByRange(partitionExprs: Column*): Dataset[T] = {
+ repartitionByRange(None, partitionExprs)
+ }
+
+ /**
+ * Returns a new Dataset that has exactly `numPartitions` partitions, when the fewer partitions
+ * are requested. If a larger number of partitions is requested, it will stay at the current
+ * number of partitions. Similar to coalesce defined on an `RDD`, this operation results in a
+ * narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a
+ * shuffle, instead each of the 100 new partitions will claim 10 of the current partitions.
+ *
+ * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in
+ * your computation taking place on fewer nodes than you like (e.g. one node in the case of
+ * numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step,
+ * but means the current upstream partitions will be executed in parallel (per whatever the
+ * current partitioning is).
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def coalesce(numPartitions: Int): Dataset[T] = {
+ buildRepartition(numPartitions, shuffle = false)
+ }
+
+ /**
+ * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias
+ * for `dropDuplicates`.
+ *
+ * Note that for a streaming [[Dataset]], this method returns distinct rows only once regardless
+ * of the output mode, which the behavior may not be same with `DISTINCT` in SQL against
+ * streaming [[Dataset]].
+ *
+ * @note
+ * Equality checking is performed directly on the encoded representation of the data and thus
+ * is not affected by a custom `equals` function defined on `T`.
+ *
+ * @group typedrel
+ * @since 3.4.0
+ */
+ def distinct(): Dataset[T] = dropDuplicates()
+
+ /**
+ * Returns a best-effort snapshot of the files that compose this Dataset. This method simply
+ * asks each constituent BaseRelation for its respective files and takes the union of all
+ * results. Depending on the source relations, this may not find all input files. Duplicates are
+ * removed.
+ *
+ * @group basic
+ * @since 3.4.0
+ */
+ def inputFiles: Array[String] = analyze.getInputFilesList.asScala.toArray
+
+ private[sql] def analyze: proto.AnalyzePlanResponse = {
+ session.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
+ }
+
+ def collectResult(): SparkResult = session.execute(plan)
+
+ private def withResult[E](f: SparkResult => E): E = {
+ val result = collectResult()
+ try f(result)
+ finally {
+ result.close()
+ }
+ }
}
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 55452c6df40..7d6597bf6d5 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -68,7 +68,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
- def range(end: Long): Dataset[java.lang.Long] = range(0, end)
+ def range(end: Long): Dataset[Row] = range(0, end)
/**
* Creates a [[Dataset]] with a single `LongType` column named `id`, containing elements in a
@@ -76,7 +76,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
- def range(start: Long, end: Long): Dataset[java.lang.Long] = {
+ def range(start: Long, end: Long): Dataset[Row] = {
range(start, end, step = 1)
}
@@ -86,7 +86,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
- def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
+ def range(start: Long, end: Long, step: Long): Dataset[Row] = {
range(start, end, step, None)
}
@@ -96,7 +96,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
*
* @since 3.4.0
*/
- def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[java.lang.Long] = {
+ def range(start: Long, end: Long, step: Long, numPartitions: Int): Dataset[Row] = {
range(start, end, step, Option(numPartitions))
}
@@ -104,7 +104,7 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
start: Long,
end: Long,
step: Long,
- numPartitions: Option[Int]): Dataset[java.lang.Long] = {
+ numPartitions: Option[Int]): Dataset[Row] = {
newDataset { builder =>
val rangeBuilder = builder.getRangeBuilder
.setStart(start)
@@ -121,8 +121,10 @@ class SparkSession(private val client: SparkConnectClient, private val cleaner:
new Dataset[T](this, plan)
}
- private[sql] def analyze(plan: proto.Plan): proto.AnalyzePlanResponse =
- client.analyze(plan)
+ private[sql] def analyze(
+ plan: proto.Plan,
+ mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse =
+ client.analyze(plan, mode)
private[sql] def execute(plan: proto.Plan): SparkResult = {
val value = client.execute(plan)
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index 8252f8aef76..3049a0a0a5d 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -70,10 +70,11 @@ class SparkConnectClient(
* @return
* A [[proto.AnalyzePlanResponse]] from the Spark Connect server.
*/
- def analyze(plan: proto.Plan): proto.AnalyzePlanResponse = {
+ def analyze(plan: proto.Plan, mode: proto.Explain.ExplainMode): proto.AnalyzePlanResponse = {
val request = proto.AnalyzePlanRequest
.newBuilder()
.setPlan(plan)
+ .setExplain(proto.Explain.newBuilder().setExplainMode(mode))
.setUserContext(userContext)
.setClientId(sessionId)
.build()
diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 6332ed8c734..4b4bedaf659 100644
--- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -92,6 +92,20 @@ object functions {
}
}
+ /**
+ * Parses the expression string into the column that it represents, similar to
+ * [[Dataset#selectExpr]].
+ * {{{
+ * // get the number of words of each length
+ * df.groupBy(expr("length(word)")).count()
+ * }}}
+ *
+ * @group normal_funcs
+ */
+ def expr(expr: String): Column = Column { builder =>
+ builder.getExpressionStringBuilder.setExpression(expr)
+ }
+
// scalastyle:off line.size.limit
/**
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index db2b8b26987..a0dd4746a8b 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -16,6 +16,13 @@
*/
package org.apache.spark.sql
+import java.io.{ByteArrayOutputStream, PrintStream}
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.scalactic.TolerantNumerics
+
import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -23,13 +30,13 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
class ClientE2ETestSuite extends RemoteSparkSession {
// Spark Result
- test("test spark result schema") {
+ test("spark result schema") {
val df = spark.sql("select val from (values ('Hello'), ('World')) as t(val)")
val schema = df.collectResult().schema
assert(schema == StructType(StructField("val", StringType, false) :: Nil))
}
- test("test spark result array") {
+ test("spark result array") {
val df = spark.sql("select val from (values ('Hello'), ('World')) as t(val)")
val result = df.collectResult()
assert(result.length == 2)
@@ -39,7 +46,7 @@ class ClientE2ETestSuite extends RemoteSparkSession {
assert(array(1).getString(0) == "World")
}
- test("simple dataset test") {
+ test("simple dataset") {
val df = spark.range(10).limit(3)
val result = df.collectResult()
assert(result.length == 3)
@@ -49,7 +56,7 @@ class ClientE2ETestSuite extends RemoteSparkSession {
assert(array(2).getLong(0) == 2)
}
- test("simple udf test") {
+ test("simple udf") {
def dummyUdf(x: Int): Int = x + 5
val myUdf = udf(dummyUdf _)
@@ -64,4 +71,173 @@ class ClientE2ETestSuite extends RemoteSparkSession {
// TODO test large result when we can create table or view
// test("test spark large result")
+ private def captureStdOut(block: => Unit): String = {
+ val currentOut = Console.out
+ val capturedOut = new ByteArrayOutputStream()
+ val newOut = new PrintStream(new TeeOutputStream(currentOut, capturedOut))
+ Console.withOut(newOut) {
+ block
+ }
+ capturedOut.toString
+ }
+
+ private def checkFragments(result: String, fragmentsToCheck: Seq[String]): Unit = {
+ fragmentsToCheck.foreach { fragment =>
+ assert(result.contains(fragment))
+ }
+ }
+
+ private def testCapturedStdOut(block: => Unit, fragmentsToCheck: String*): Unit = {
+ checkFragments(captureStdOut(block), fragmentsToCheck)
+ }
+
+ private def testCapturedStdOut(
+ block: => Unit,
+ expectedNumLines: Int,
+ expectedMaxWidth: Int,
+ fragmentsToCheck: String*): Unit = {
+ val result = captureStdOut(block)
+ val lines = result.split('\n')
+ assert(lines.length === expectedNumLines)
+ assert(lines.map((s: String) => s.length).max <= expectedMaxWidth)
+ checkFragments(result, fragmentsToCheck)
+ }
+
+ private val simpleSchema = new StructType().add("id", "long", nullable = false)
+
+ // Dataset tests
+ test("Dataset inspection") {
+ val df = spark.range(10)
+ val local = spark.newDataset { builder =>
+ builder.getLocalRelationBuilder.setSchema(simpleSchema.catalogString)
+ }
+ assert(!df.isLocal)
+ assert(local.isLocal)
+ assert(!df.isStreaming)
+ assert(df.toString.contains("[id: bigint]"))
+ assert(df.inputFiles.isEmpty)
+ }
+
+ test("Dataset schema") {
+ val df = spark.range(10)
+ assert(df.schema === simpleSchema)
+ assert(df.dtypes === Array(("id", "LongType")))
+ assert(df.columns === Array("id"))
+ testCapturedStdOut(df.printSchema(), simpleSchema.treeString)
+ testCapturedStdOut(df.printSchema(5), simpleSchema.treeString(5))
+ }
+
+ test("Dataset explain") {
+ val df = spark.range(10)
+ val simpleExplainFragments = Seq("== Physical Plan ==")
+ testCapturedStdOut(df.explain(), simpleExplainFragments: _*)
+ testCapturedStdOut(df.explain(false), simpleExplainFragments: _*)
+ testCapturedStdOut(df.explain("simple"), simpleExplainFragments: _*)
+ val extendedExplainFragments = Seq(
+ "== Parsed Logical Plan ==",
+ "== Analyzed Logical Plan ==",
+ "== Optimized Logical Plan ==") ++
+ simpleExplainFragments
+ testCapturedStdOut(df.explain(true), extendedExplainFragments: _*)
+ testCapturedStdOut(df.explain("extended"), extendedExplainFragments: _*)
+ testCapturedStdOut(
+ df.explain("cost"),
+ simpleExplainFragments :+ "== Optimized Logical Plan ==": _*)
+ testCapturedStdOut(df.explain("codegen"), "WholeStageCodegen subtrees.")
+ testCapturedStdOut(df.explain("formatted"), "Range", "Arguments: ")
+ }
+
+ test("Dataset result collection") {
+ def checkResult(rows: TraversableOnce[Row], expectedValues: Long*): Unit = {
+ rows.toIterator.zipAll(expectedValues.iterator, null, null).foreach {
+ case (actual, expected) => assert(actual.getLong(0) === expected)
+ }
+ }
+ val df = spark.range(10)
+ checkResult(df.head() :: Nil, 0L)
+ checkResult(df.head(5), 0L, 1L, 2L, 3L, 4L)
+ checkResult(df.first() :: Nil, 0L)
+ assert(!df.isEmpty)
+ assert(df.filter("id > 100").isEmpty)
+ checkResult(df.take(3), 0L, 1L, 2L)
+ checkResult(df.tail(3), 7L, 8L, 9L)
+ checkResult(df.takeAsList(10).asScala, 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L)
+ checkResult(df.filter("id % 3 = 0").collect(), 0L, 3L, 6L, 9L)
+ checkResult(df.filter("id < 3").collectAsList().asScala, 0L, 1L, 2L)
+ val iterator = df.filter("id > 5 and id < 9").toLocalIterator()
+ try {
+ checkResult(iterator.asScala, 6L, 7L, 8L)
+ } finally {
+ iterator.asInstanceOf[AutoCloseable].close()
+ }
+ }
+
+ test("Dataset show") {
+ val df = spark.range(20)
+ testCapturedStdOut(df.show(), 24, 5, "+---+", "| id|", "| 0|", "| 19|")
+ testCapturedStdOut(
+ df.show(10),
+ 15,
+ 24,
+ "+---+",
+ "| id|",
+ "| 0|",
+ "| 9|",
+ "only showing top 10 rows")
+ val wideDf =
+ spark.range(4).selectExpr("id", "concat('very_very_very_long_string', id) as val")
+ testCapturedStdOut(
+ wideDf.show(true),
+ 8,
+ 26,
+ "+---+--------------------+",
+ "| id| val|",
+ "| 0|very_very_very_lo...|")
+ testCapturedStdOut(
+ wideDf.show(false),
+ 8,
+ 33,
+ "+---+---------------------------+",
+ "|id |val |",
+ "|2 |very_very_very_long_string2|")
+ testCapturedStdOut(
+ wideDf.show(2, truncate = false),
+ 7,
+ 33,
+ "+---+---------------------------+",
+ "|id |val |",
+ "|1 |very_very_very_long_string1|",
+ "only showing top 2 rows")
+ testCapturedStdOut(
+ df.show(8, 10, vertical = true),
+ 17,
+ 23,
+ "-RECORD 3--",
+ "id | 7",
+ "only showing top 8 rows")
+ }
+
+ test("Dataset randomSplit") {
+ implicit val tolerance = TolerantNumerics.tolerantDoubleEquality(0.01)
+
+ val df = spark.range(100)
+ def checkSample(ds: DataFrame, lower: Double, upper: Double, seed: Long): Unit = {
+ assert(ds.plan.getRoot.hasSample)
+ val sample = ds.plan.getRoot.getSample
+ assert(sample.getSeed === seed)
+ assert(sample.getLowerBound === lower)
+ assert(sample.getUpperBound === upper)
+ }
+ val Array(ds1, ds2, ds3) = df.randomSplit(Array(8, 9, 7), 123L)
+ checkSample(ds1, 0, 8.0 / 24.0, 123L)
+ checkSample(ds2, 8.0 / 24.0, 17.0 / 24.0, 123L)
+ checkSample(ds3, 17.0 / 24.0, 1.0, 123L)
+
+ val datasets = df.randomSplitAsList(Array(1, 2, 3, 4), 9L)
+ assert(datasets.size() === 4)
+ checkSample(datasets.get(0), 0, 1.0 / 10.0, 9L)
+ checkSample(datasets.get(1), 1.0 / 10.0, 3.0 / 10.0, 9L)
+ checkSample(datasets.get(2), 3.0 / 10.0, 6.0 / 10.0, 9L)
+ checkSample(datasets.get(3), 6.0 / 10.0, 1.0, 9L)
+ }
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5e7cc9f4b6d..087dcbb360a 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -27,6 +27,10 @@ import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
import org.apache.spark.connect.proto
import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
+// Add sample tests.
+// - sample fraction: simple.sample(0.1)
+// - sample withReplacement_fraction: simple.sample(withReplacement = true, 0.11)
+// Add tests for exceptions thrown
class DatasetSuite
extends AnyFunSuite // scalastyle:ignore funsuite
with BeforeAndAfterEach {
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 494c497c553..28ac12928fd 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -31,7 +31,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{functions => fn}
import org.apache.spark.sql.connect.client.SparkConnectClient
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
// scalastyle:off
/**
@@ -153,18 +153,25 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
}
}
+ private def createLocalRelation(schema: StructType): DataFrame = session.newDataset { builder =>
+ // TODO API is not consistent. Now we have two different ways of working with schemas!
+ builder.getLocalRelationBuilder.setSchema(schema.catalogString)
+ }
+
private val simpleSchema = new StructType()
.add("id", "long")
.add("a", "int")
.add("b", "double")
- private val simpleSchemaString = simpleSchema.catalogString
+ private val otherSchema = new StructType()
+ .add("a", "int")
+ .add("id", "long")
+ .add("payload", "binary")
- // We manually construct a simple empty data frame.
- private def simple = session.newDataset { builder =>
- // TODO API is not consistent. Now we have two different ways of working with schemas!
- builder.getLocalRelationBuilder.setSchema(simpleSchemaString)
- }
+ // A few helper dataframes.
+ private def simple: DataFrame = createLocalRelation(simpleSchema)
+ private def left: DataFrame = simple
+ private def right: DataFrame = createLocalRelation(otherSchema)
private def select(cs: Column*): DataFrame = simple.select(cs: _*)
@@ -190,6 +197,310 @@ class PlanGenerationTestSuite extends ConnectFunSuite with BeforeAndAfterAll wit
simple.filter(fn.col("id") === fn.lit(10L))
}
+ test("toDF") {
+ simple.toDF("x1", "x2", "x3")
+ }
+
+ test("to") {
+ simple.to(
+ new StructType()
+ .add("b", "double")
+ .add("id", "int"))
+ }
+
+ test("join inner_no_condition") {
+ left.join(right)
+ }
+
+ test("join inner_using_single_col") {
+ left.join(right, "id")
+ }
+
+ test("join inner_using_multiple_col_array") {
+ left.join(right, Array("id", "a"))
+ }
+
+ test("join inner_using_multiple_col_seq") {
+ left.join(right, Seq("id", "a"))
+ }
+
+ test("join using_single_col") {
+ left.join(right, "id", "left_semi")
+ }
+
+ test("join using_multiple_col_array") {
+ left.join(right, Array("id", "a"), "full_outer")
+ }
+
+ test("join using_multiple_col_seq") {
+ left.join(right, Seq("id", "a"), "right_outer")
+ }
+
+ test("join inner_condition") {
+ left.join(right, fn.col("a") === fn.col("a"))
+ }
+
+ test("join condition") {
+ left.join(right, fn.col("id") === fn.col("id"), "left_anti")
+ }
+
+ test("crossJoin") {
+ left.crossJoin(right)
+ }
+
+ test("sortWithinPartitions strings") {
+ simple.sortWithinPartitions("a", "id")
+ }
+
+ test("sortWithinPartitions columns") {
+ simple.sortWithinPartitions(fn.col("id"), fn.col("b"))
+ }
+
+ test("sort strings") {
+ simple.sort("b", "a")
+ }
+
+ test("sort columns") {
+ simple.sort(fn.col("id"), fn.col("b"))
+ }
+
+ test("orderBy strings") {
+ simple.sort("b", "id", "a")
+ }
+
+ test("orderBy columns") {
+ simple.sort(fn.col("id"), fn.col("b"), fn.col("a"))
+ }
+
+ test("apply") {
+ simple.select(simple.apply("a"))
+ }
+
+ test("hint") {
+ simple.hint("coalesce", 100)
+ }
+
+ test("col") {
+ simple.select(simple.col("id"), simple.col("b"))
+ }
+
+ test("colRegex") {
+ simple.select(simple.colRegex("a|id"))
+ }
+
+ test("as string") {
+ simple.as("foo")
+ }
+
+ test("as symbol") {
+ simple.as('bar)
+ }
+ test("alias string") {
+ simple.alias("fooz")
+ }
+
+ test("alias symbol") {
+ simple.alias("bob")
+ }
+
+ test("select strings") {
+ simple.select("id", "a")
+ }
+
+ test("selectExpr") {
+ simple.selectExpr("a + 10 as x", "id % 10 as grp")
+ }
+
+ test("filter expr") {
+ simple.filter("exp(a) < 10.0")
+ }
+
+ test("where column") {
+ simple.where(fn.col("id") === fn.lit(1L))
+ }
+
+ test("where expr") {
+ simple.where("a + id < 1000")
+ }
+
+ test("unpivot values") {
+ simple.unpivot(
+ ids = Array(fn.col("id"), fn.col("a")),
+ values = Array(fn.col("b")),
+ variableColumnName = "name",
+ valueColumnName = "value")
+ }
+
+ test("unpivot no_values") {
+ simple.unpivot(
+ ids = Array(fn.col("id")),
+ variableColumnName = "name",
+ valueColumnName = "value")
+ }
+
+ test("melt values") {
+ simple.unpivot(
+ ids = Array(fn.col("a")),
+ values = Array(fn.col("id")),
+ variableColumnName = "name",
+ valueColumnName = "value")
+ }
+
+ test("melt no_values") {
+ simple.melt(
+ ids = Array(fn.col("id"), fn.col("a")),
+ variableColumnName = "name",
+ valueColumnName = "value")
+ }
+
+ test("offset") {
+ simple.offset(1000)
+ }
+
+ test("union") {
+ simple.union(simple)
+ }
+
+ test("unionAll") {
+ simple.union(simple)
+ }
+
+ test("unionByName") {
+ simple.unionByName(right)
+ }
+
+ test("unionByName allowMissingColumns") {
+ simple.unionByName(right, allowMissingColumns = true)
+ }
+
+ test("intersect") {
+ simple.intersect(simple)
+ }
+
+ test("intersectAll") {
+ simple.intersectAll(simple)
+ }
+
+ test("except") {
+ simple.except(simple)
+ }
+
+ test("exceptAll") {
+ simple.exceptAll(simple)
+ }
+
+ test("sample fraction_seed") {
+ simple.sample(0.43, 9890823L)
+ }
+
+ test("sample withReplacement_fraction_seed") {
+ simple.sample(withReplacement = true, 0.23, 898L)
+ }
+
+ test("withColumn single") {
+ simple.withColumn("z", fn.expr("a + 100"))
+ }
+
+ test("withColumns scala_map") {
+ simple.withColumns(Map(("b", fn.lit("redacted")), ("z", fn.expr("a + 100"))))
+ }
+
+ test("withColumns java_map") {
+ val map = new java.util.HashMap[String, Column]
+ map.put("g", fn.col("id"))
+ map.put("a", fn.lit("123"))
+ simple.withColumns(map)
+ }
+
+ test("withColumnRenamed single") {
+ simple.withColumnRenamed("id", "nid")
+ }
+
+ test("withColumnRenamed scala_map") {
+ simple.withColumnsRenamed(Map(("a", "alpha"), ("b", "beta")))
+ }
+
+ test("withColumnRenamed java_map") {
+ val map = new java.util.HashMap[String, String]
+ map.put("id", "nid")
+ map.put("b", "bravo")
+ simple.withColumnsRenamed(map)
+ }
+
+ test("withMetadata") {
+ val builder = new MetadataBuilder
+ builder.putString("description", "unique identifier")
+ simple.withMetadata("id", builder.build())
+ }
+
+ test("drop single string") {
+ simple.drop("a")
+ }
+
+ test("drop multiple strings") {
+ simple.drop("id", "a", "b")
+ }
+
+ test("drop single column") {
+ simple.drop(fn.col("b"))
+ }
+
+ test("drop multiple column") {
+ simple.drop(fn.col("b"), fn.col("id"))
+ }
+
+ test("dropDuplicates") {
+ simple.dropDuplicates()
+ }
+
+ test("dropDuplicates names seq") {
+ simple.dropDuplicates("a" :: "b" :: Nil)
+ }
+
+ test("dropDuplicates names array") {
+ simple.dropDuplicates(Array("a", "id"))
+ }
+
+ test("dropDuplicates varargs") {
+ simple.dropDuplicates("a", "b", "id")
+ }
+
+ test("describe") {
+ simple.describe("id", "b")
+ }
+
+ test("summary") {
+ simple.summary("mean", "min")
+ }
+
+ test("repartition") {
+ simple.repartition(24)
+ }
+
+ test("repartition num_partitions_expressions") {
+ simple.repartition(22, fn.col("a"), fn.col("id"))
+ }
+
+ test("repartition expressions") {
+ simple.repartition(fn.col("id"), fn.col("b"))
+ }
+
+ test("repartitionByRange num_partitions_expressions") {
+ simple.repartitionByRange(33, fn.col("b"), fn.col("id").desc_nulls_first)
+ }
+
+ test("repartitionByRange expressions") {
+ simple.repartitionByRange(fn.col("a").asc, fn.col("id").desc_nulls_first)
+ }
+
+ test("coalesce") {
+ simple.coalesce(5)
+ }
+
+ test("distinct") {
+ simple.distinct()
+ }
+
/* Column API */
test("column by name") {
select(fn.col("b"))
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
index 760609a703f..8410073d6ec 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionSuite.scala
@@ -65,7 +65,7 @@ class SparkSessionSuite
.build())
.build()
val plan = proto.Plan.newBuilder().build()
- ss.analyze(plan)
+ ss.analyze(plan, proto.Explain.ExplainMode.SIMPLE)
assert(plan.equals(service.getAndClearLatestInputPlan()))
}
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
index 21eed56ee78..867995efc2e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CompatibilitySuite.scala
@@ -83,7 +83,9 @@ class CompatibilitySuite extends AnyFunSuite { // scalastyle:ignore funsuite
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.sql"),
// Skip all shaded dependencies in the client.
ProblemFilters.exclude[Problem]("org.sparkproject.*"),
- ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"))
+ ProblemFilters.exclude[Problem]("org.apache.spark.connect.proto.*"),
+ // Disable Range until we support typed APIs
+ ProblemFilters.exclude[Problem]("org.apache.spark.sql.SparkSession.range"))
val problems = allProblems
.filter { p =>
includedRules.exists(rule => rule(p))
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
similarity index 89%
rename from connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
rename to connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
index 5a70fb2741a..c30ea8c8301 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/DataTypeProtoConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/DataTypeProtoConverter.scala
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connect.planner
+package org.apache.spark.sql.connect.common
import scala.collection.convert.ImplicitConversions._
import org.apache.spark.connect.proto
-import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
- * This object offers methods to convert to/from connect proto to catalyst types.
+ * Helper class for conversions between [[DataType]] and [[proto.DataType]].
*/
object DataTypeProtoConverter {
def toCatalystType(t: proto.DataType): DataType = {
@@ -360,28 +359,4 @@ object DataTypeProtoConverter {
throw InvalidPlanInput(s"Does not support convert ${t.typeName} to connect proto types.")
}
}
-
- def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode = {
- mode match {
- case proto.WriteOperation.SaveMode.SAVE_MODE_APPEND => SaveMode.Append
- case proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE => SaveMode.Ignore
- case proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE => SaveMode.Overwrite
- case proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS => SaveMode.ErrorIfExists
- case _ =>
- throw new IllegalArgumentException(
- s"Cannot convert from WriteOperaton.SaveMode to Spark SaveMode: ${mode.getNumber}")
- }
- }
-
- def toSaveModeProto(mode: SaveMode): proto.WriteOperation.SaveMode = {
- mode match {
- case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
- case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
- case SaveMode.Overwrite => proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
- case SaveMode.ErrorIfExists => proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
- case _ =>
- throw new IllegalArgumentException(
- s"Cannot convert from SaveMode to WriteOperation.SaveMode: ${mode.name()}")
- }
- }
}
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala
new file mode 100644
index 00000000000..0caa4122f09
--- /dev/null
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.common
+
+/**
+ * Error thrown when a connect plan is not valid.
+ */
+final case class InvalidPlanInput(
+ private val message: String = "",
+ private val cause: Throwable = None.orNull)
+ extends Exception(message, cause)
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_string.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/alias_symbol.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain
new file mode 100644
index 00000000000..b0d2c358215
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/apply.explain
@@ -0,0 +1,2 @@
+'Project ['a]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/as_string.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain
new file mode 100644
index 00000000000..b7e3d0a62ec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/as_symbol.explain
@@ -0,0 +1 @@
+LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain
new file mode 100644
index 00000000000..294fa0f73c3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/coalesce.explain
@@ -0,0 +1,2 @@
+Repartition 5, false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain
new file mode 100644
index 00000000000..0a5d0a342db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/col.explain
@@ -0,0 +1,2 @@
+'Project ['id, 'b]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain
new file mode 100644
index 00000000000..543b1faaa99
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/colRegex.explain
@@ -0,0 +1,2 @@
+'Project ['a|id]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain
new file mode 100644
index 00000000000..05c0d319db4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/crossJoin.explain
@@ -0,0 +1,3 @@
+'Join Cross
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
new file mode 100644
index 00000000000..3f88f836b4b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/describe.explain
@@ -0,0 +1,6 @@
+Project [none#2, element_at(none#0, none#2, None, false) AS #0, element_at(none#1, none#2, None, false) AS #1]
++- !Project [none#0, none#1, none#2]
+ +- Generate explode([count,mean,stddev,min,max]), false, [none#0]
+ +- !Aggregate [map(cast(count as string), cast(count(none#0L) as string), cast(mean as string), cast(avg(none#0L) as string), cast(stddev as string), cast(stddev_samp(cast(none#0L as double)) as string), cast(min as string), cast(min(none#0L) as string), cast(max as string), cast(max(none#0L) as string)) AS #0, map(cast(count as string), cast(count(none#1) as string), cast(mean as string), cast(avg(none#1) as string), cast(stddev as string), cast(stddev_samp(none#1) as string), cas [...]
+ +- Project [none#0L, none#2]
+ +- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain
new file mode 100644
index 00000000000..e4da86166dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/distinct.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain
new file mode 100644
index 00000000000..e4da86166dd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain
new file mode 100644
index 00000000000..9f8c1bb8d91
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_array.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#0L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain
new file mode 100644
index 00000000000..cd7bbac89df
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_names_seq.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain
new file mode 100644
index 00000000000..b034ead771d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/dropDuplicates_varargs.explain
@@ -0,0 +1,2 @@
+Deduplicate [none#1, none#2, none#0L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain
new file mode 100644
index 00000000000..d50b1411784
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_column.explain
@@ -0,0 +1,2 @@
+Project [none#1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain
new file mode 100644
index 00000000000..75d78bac2d9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_multiple_strings.explain
@@ -0,0 +1,2 @@
+Project
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain
new file mode 100644
index 00000000000..d63618297e5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_column.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain
new file mode 100644
index 00000000000..7eaf23106a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/drop_single_string.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain
new file mode 100644
index 00000000000..eb3c62c6f51
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/except.explain
@@ -0,0 +1,3 @@
+'Except false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain
new file mode 100644
index 00000000000..95882222ba0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/exceptAll.explain
@@ -0,0 +1,3 @@
+'Except All true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain
new file mode 100644
index 00000000000..4a4d83cbbe3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/filter_expr.explain
@@ -0,0 +1,2 @@
+'Filter (10.0 > 'exp('a))
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain
new file mode 100644
index 00000000000..cdcf089bf7a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_udf.explain
@@ -0,0 +1,2 @@
+'Project [unresolvedalias((), None), unresolvedalias(foo('id), None), unresolvedalias(f3('id, 'id), None), unresolvedalias(bar('id, 'id, 'id), None), unresolvedalias(f_four('id, 'id, 'id, 'id), None)]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain
new file mode 100644
index 00000000000..71ec808a221
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/hint.explain
@@ -0,0 +1,2 @@
+UnresolvedHint coalesce, [100]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain
new file mode 100644
index 00000000000..89b24beb6fe
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/intersect.explain
@@ -0,0 +1,3 @@
+'Intersect false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain
new file mode 100644
index 00000000000..5159e4f5d1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/intersectAll.explain
@@ -0,0 +1,3 @@
+'Intersect All true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain
new file mode 100644
index 00000000000..a0f818eb7db
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_condition.explain
@@ -0,0 +1,3 @@
+'Join LeftAnti, '`=`('id, 'id)
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain
new file mode 100644
index 00000000000..f0bd1db03ac
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_condition.explain
@@ -0,0 +1,3 @@
+'Join Inner, '`=`('a, 'a)
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain
new file mode 100644
index 00000000000..df290259b0a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_no_condition.explain
@@ -0,0 +1,3 @@
+'Join Inner
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain
new file mode 100644
index 00000000000..715fa26ba2e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_array.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain
new file mode 100644
index 00000000000..715fa26ba2e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_multiple_col_seq.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain
new file mode 100644
index 00000000000..bfcdcffbff9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_inner_using_single_col.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(Inner,Buffer(id))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain
new file mode 100644
index 00000000000..92056649124
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_array.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(FullOuter,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain
new file mode 100644
index 00000000000..577f544646e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_multiple_col_seq.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(RightOuter,Buffer(id, a))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain
new file mode 100644
index 00000000000..224790686d5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/join_using_single_col.explain
@@ -0,0 +1,3 @@
+'Join UsingJoin(LeftSemi,Buffer(id))
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain
new file mode 100644
index 00000000000..36566b68c95
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_no_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id, 'a), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain
new file mode 100644
index 00000000000..e11778c63bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/melt_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('a), ArraySeq(List('id)), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain
new file mode 100644
index 00000000000..2da57931059
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/offset.explain
@@ -0,0 +1,2 @@
+Offset 1000
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain
new file mode 100644
index 00000000000..f183bc7be36
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain
new file mode 100644
index 00000000000..c976a65aab5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/orderBy_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['b ASC NULLS FIRST, 'id ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain
new file mode 100644
index 00000000000..dd6c0fe4062
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition.explain
@@ -0,0 +1,2 @@
+Repartition 24, true
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain
new file mode 100644
index 00000000000..2f169429f81
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['a ASC NULLS FIRST, 'id DESC NULLS FIRST]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain
new file mode 100644
index 00000000000..054f8ad05bb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartitionByRange_num_partitions_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['b ASC NULLS FIRST, 'id DESC NULLS FIRST], 33
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain
new file mode 100644
index 00000000000..e6f09597d7d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['id, 'b]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain
new file mode 100644
index 00000000000..dbf1e52e2d3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/repartition_num_partitions_expressions.explain
@@ -0,0 +1,2 @@
+'RepartitionByExpression ['a, 'id], 22
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain
new file mode 100644
index 00000000000..9981205d7af
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_fraction_seed.explain
@@ -0,0 +1,2 @@
+Sample 0.0, 0.43, false, 9890823
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain
new file mode 100644
index 00000000000..4e2d884bb37
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sample_withReplacement_fraction_seed.explain
@@ -0,0 +1,2 @@
+Sample 0.0, 0.23, true, 898
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain
new file mode 100644
index 00000000000..53eaa9352a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/selectExpr.explain
@@ -0,0 +1,2 @@
+'Project [('a + 10) AS #0, ('id % 10) AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain
new file mode 100644
index 00000000000..61526cc6042
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/select_strings.explain
@@ -0,0 +1,2 @@
+'Project ['id, 'a]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain
new file mode 100644
index 00000000000..79ae0d702aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain
new file mode 100644
index 00000000000..9d49e22afe4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sortWithinPartitions_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['a ASC NULLS FIRST, 'id ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain
new file mode 100644
index 00000000000..79ae0d702aa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_columns.explain
@@ -0,0 +1,2 @@
+'Sort ['id ASC NULLS FIRST, 'b ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain
new file mode 100644
index 00000000000..7bc8b68fe37
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/sort_strings.explain
@@ -0,0 +1,2 @@
+'Sort ['b ASC NULLS FIRST, 'a ASC NULLS FIRST], false
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain
new file mode 100644
index 00000000000..98f69703a4c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/summary.explain
@@ -0,0 +1,5 @@
+Project [none#3, element_at(none#0, none#3, None, false) AS #0, element_at(none#1, none#3, None, false) AS #1, element_at(none#2, none#3, None, false) AS #2]
++- !Project [none#0, none#1, none#2, none#3]
+ +- Generate explode([mean,min]), false, [none#0]
+ +- Aggregate [map(cast(mean as string), cast(avg(none#0L) as string), cast(min as string), cast(min(none#0L) as string)) AS #0, map(cast(mean as string), cast(avg(none#1) as string), cast(min as string), cast(min(none#1) as string)) AS #1, map(cast(mean as string), cast(avg(none#2) as string), cast(min as string), cast(min(none#2) as string)) AS #2]
+ +- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain
new file mode 100644
index 00000000000..cc952f01a83
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/to.explain
@@ -0,0 +1,2 @@
+Project [none#2, cast(none#0L as int) AS #0]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain
new file mode 100644
index 00000000000..aa71dcd5231
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/toDF.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1 AS #1, none#2 AS #2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain
new file mode 100644
index 00000000000..8d2530c4959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/union.explain
@@ -0,0 +1,3 @@
+Union false, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain
new file mode 100644
index 00000000000..8d2530c4959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionAll.explain
@@ -0,0 +1,3 @@
+Union false, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain
new file mode 100644
index 00000000000..104217a175d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName.explain
@@ -0,0 +1,3 @@
+'Union true, false
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain
new file mode 100644
index 00000000000..70d1955f58d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unionByName_allowMissingColumns.explain
@@ -0,0 +1,3 @@
+'Union true, true
+:- LocalRelation <empty>, [none#0L, none#1, none#2]
++- LocalRelation <empty>, [none#0, none#1L, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain
new file mode 100644
index 00000000000..3d3331f31f9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_no_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain
new file mode 100644
index 00000000000..a7b5b530463
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/unpivot_values.explain
@@ -0,0 +1,2 @@
+'Unpivot ArraySeq('id, 'a), ArraySeq(List('b)), , [value]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain
new file mode 100644
index 00000000000..9580f08b5e6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/where_column.explain
@@ -0,0 +1,2 @@
+'Filter '`=`('id, 1)
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain
new file mode 100644
index 00000000000..0c3be1cbef3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/where_expr.explain
@@ -0,0 +1,2 @@
+'Filter (1000 > ('a + 'id))
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain
new file mode 100644
index 00000000000..2fed4e66ac5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_java_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2 AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain
new file mode 100644
index 00000000000..9a22c8ad5cb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_scala_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1 AS #0, none#2 AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain
new file mode 100644
index 00000000000..07dec2eba0f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumnRenamed_single.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain
new file mode 100644
index 00000000000..fe8ec9459bc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumn_single.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1, none#2, (100 + none#1) AS #0]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain
new file mode 100644
index 00000000000..6702dc2f18e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_java_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, 123 AS #0, none#2, none#0L AS #1L]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain
new file mode 100644
index 00000000000..97066dbe7df
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withColumns_scala_map.explain
@@ -0,0 +1,2 @@
+Project [none#0L, none#1, redacted AS #0, (100 + none#1) AS #1]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain
new file mode 100644
index 00000000000..07dec2eba0f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/explain-results/withMetadata.explain
@@ -0,0 +1,2 @@
+Project [none#0L AS #0L, none#1, none#2]
++- LocalRelation <empty>, [none#0L, none#1, none#2]
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json
new file mode 100644
index 00000000000..1209a4dfb87
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.json
@@ -0,0 +1,10 @@
+{
+ "subqueryAlias": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "alias": "fooz"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin
new file mode 100644
index 00000000000..1f969506e38
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_string.proto.bin
@@ -0,0 +1,2 @@
+�,
+$Z" struct<id:bigint,a:int,b:double>fooz
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json
new file mode 100644
index 00000000000..ba861c077fb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.json
@@ -0,0 +1,10 @@
+{
+ "subqueryAlias": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "alias": "bob"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin
new file mode 100644
index 00000000000..788038972d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/alias_symbol.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>bob
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/apply.json b/connector/connect/common/src/test/resources/query-tests/queries/apply.json
new file mode 100644
index 00000000000..d53d24efef8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/apply.json
@@ -0,0 +1,14 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin
new file mode 100644
index 00000000000..80cb4ad244e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/apply.proto.bin
@@ -0,0 +1,3 @@
+-
+$Z" struct<id:bigint,a:int,b:double>
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_string.json b/connector/connect/common/src/test/resources/query-tests/queries/as_string.json
new file mode 100644
index 00000000000..501189fbe13
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_string.json
@@ -0,0 +1,10 @@
+{
+ "subqueryAlias": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "alias": "foo"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin
new file mode 100644
index 00000000000..f433909e126
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_string.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>foo
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json
new file mode 100644
index 00000000000..e2a7d2bb193
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.json
@@ -0,0 +1,10 @@
+{
+ "subqueryAlias": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "alias": "bar"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin
new file mode 100644
index 00000000000..c9ad3f8e0c5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/as_symbol.proto.bin
@@ -0,0 +1,2 @@
+�+
+$Z" struct<id:bigint,a:int,b:double>bar
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json
new file mode 100644
index 00000000000..34a329f893a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.json
@@ -0,0 +1,11 @@
+{
+ "repartition": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "numPartitions": 5,
+ "shuffle": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin
new file mode 100644
index 00000000000..ed1a6ce29fa
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/coalesce.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/col.json b/connector/connect/common/src/test/resources/query-tests/queries/col.json
new file mode 100644
index 00000000000..aa2c09ce0d7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/col.json
@@ -0,0 +1,18 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin
new file mode 100644
index 00000000000..6db0c6bb0b0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/col.proto.bin
@@ -0,0 +1,4 @@
+5
+$Z" struct<id:bigint,a:int,b:double>
+id
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json
new file mode 100644
index 00000000000..bb4c89e4516
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.json
@@ -0,0 +1,14 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedRegex": {
+ "colName": "a|id"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin
new file mode 100644
index 00000000000..1a3b97ea0f5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/colRegex.proto.bin
@@ -0,0 +1,3 @@
+0
+$Z" struct<id:bigint,a:int,b:double>B
+a|id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json
new file mode 100644
index 00000000000..bac0eebf8ee
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.json
@@ -0,0 +1,15 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_CROSS"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin
new file mode 100644
index 00000000000..b86654bb57f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/crossJoin.proto.bin
@@ -0,0 +1,2 @@
+*T
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/describe.json b/connector/connect/common/src/test/resources/query-tests/queries/describe.json
new file mode 100644
index 00000000000..c3e9ded3133
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/describe.json
@@ -0,0 +1,10 @@
+{
+ "describe": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": ["id", "b"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin
new file mode 100644
index 00000000000..a39ee7fc80c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/describe.proto.bin
@@ -0,0 +1,2 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>idb
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
new file mode 100644
index 00000000000..094caa14208
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json
@@ -0,0 +1,10 @@
+{
+ "deduplicate": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "allColumnsAsKeys": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
new file mode 100644
index 00000000000..4892a7c7162
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin
@@ -0,0 +1,2 @@
+r(
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
new file mode 100644
index 00000000000..094caa14208
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json
@@ -0,0 +1,10 @@
+{
+ "deduplicate": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "allColumnsAsKeys": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
new file mode 100644
index 00000000000..4892a7c7162
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin
@@ -0,0 +1,2 @@
+r(
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
new file mode 100644
index 00000000000..e55248a383e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json
@@ -0,0 +1,10 @@
+{
+ "deduplicate": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "columnNames": ["a", "id"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
new file mode 100644
index 00000000000..51d419de570
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin
@@ -0,0 +1,2 @@
+r-
+$Z" struct<id:bigint,a:int,b:double>aid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
new file mode 100644
index 00000000000..d2a9e041418
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json
@@ -0,0 +1,10 @@
+{
+ "deduplicate": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "columnNames": ["a", "b"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
new file mode 100644
index 00000000000..57c97010bdd
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin
@@ -0,0 +1,2 @@
+r,
+$Z" struct<id:bigint,a:int,b:double>ab
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
new file mode 100644
index 00000000000..7b2468c70a6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json
@@ -0,0 +1,10 @@
+{
+ "deduplicate": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "columnNames": ["a", "b", "id"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
new file mode 100644
index 00000000000..496dc9e4516
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin
@@ -0,0 +1,2 @@
+r0
+$Z" struct<id:bigint,a:int,b:double>abid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json
new file mode 100644
index 00000000000..275f15a04e1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.json
@@ -0,0 +1,18 @@
+{
+ "drop": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin
new file mode 100644
index 00000000000..65d2cac04cc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_column.proto.bin
@@ -0,0 +1,4 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+b
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json
new file mode 100644
index 00000000000..96d50cef533
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.json
@@ -0,0 +1,22 @@
+{
+ "drop": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin
new file mode 100644
index 00000000000..29ae3de0fcc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_multiple_strings.proto.bin
@@ -0,0 +1,5 @@
+�<
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json
new file mode 100644
index 00000000000..454bb1744b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.json
@@ -0,0 +1,14 @@
+{
+ "drop": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin
new file mode 100644
index 00000000000..3cf2a6b652a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_column.proto.bin
@@ -0,0 +1,3 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json
new file mode 100644
index 00000000000..812fc60d9d2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.json
@@ -0,0 +1,14 @@
+{
+ "drop": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "cols": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin
new file mode 100644
index 00000000000..a4146717a0b
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/drop_single_string.proto.bin
@@ -0,0 +1,3 @@
+�-
+$Z" struct<id:bigint,a:int,b:double>
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/except.json b/connector/connect/common/src/test/resources/query-tests/queries/except.json
new file mode 100644
index 00000000000..d88134dc268
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/except.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_EXCEPT",
+ "isAll": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin
new file mode 100644
index 00000000000..64fcb2a45c9
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/except.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json
new file mode 100644
index 00000000000..dfb04bf53a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_EXCEPT",
+ "isAll": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin
new file mode 100644
index 00000000000..cc746e54e00
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/exceptAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json
new file mode 100644
index 00000000000..77635b51172
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.json
@@ -0,0 +1,14 @@
+{
+ "filter": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "condition": {
+ "expressionString": {
+ "expression": "exp(a) \u003c 10.0"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin
new file mode 100644
index 00000000000..e7ddb9c54fa
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/filter_expr.proto.bin
@@ -0,0 +1,3 @@
+"9
+$Z" struct<id:bigint,a:int,b:double>"
+
exp(a) < 10.0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json
new file mode 100644
index 00000000000..76738354e15
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.json
@@ -0,0 +1,96 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "commonInlineUserDefinedFunction": {
+ "scalarScalaUdf": {
+ "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQxBwIoVHJpGRUCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+ }
+ }
+ }, {
+ "commonInlineUserDefinedFunction": {
+ "functionName": "foo",
+ "deterministic": true,
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "scalarScalaUdf": {
+ "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQyWQlE7Ce2cPkCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+ "nullable": true
+ }
+ }
+ }, {
+ "commonInlineUserDefinedFunction": {
+ "functionName": "f3",
+ "deterministic": true,
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "scalarScalaUdf": {
+ "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQzyD4NN4Grh74CAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+ "nullable": true
+ }
+ }
+ }, {
+ "commonInlineUserDefinedFunction": {
+ "functionName": "bar",
+ "deterministic": true,
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "scalarScalaUdf": {
+ "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQ009cpyjjQtFMCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+ "nullable": true
+ }
+ }
+ }, {
+ "commonInlineUserDefinedFunction": {
+ "functionName": "f_four",
+ "deterministic": true,
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "scalarScalaUdf": {
+ "payload": "rO0ABXNyAC1vcmcuYXBhY2hlLnNwYXJrLnNxbC5jb25uZWN0LmNvbW1vbi5VZGZQYWNrZXR7DRDpFmMz1QIAA0wACGZ1bmN0aW9udAASTGphdmEvbGFuZy9PYmplY3Q7TAANaW5wdXRFbmNvZGVyc3QAFkxzY2FsYS9jb2xsZWN0aW9uL1NlcTtMAA1vdXRwdXRFbmNvZGVydAA4TG9yZy9hcGFjaGUvc3Bhcmsvc3FsL2NhdGFseXN0L2VuY29kZXJzL0Fnbm9zdGljRW5jb2Rlcjt4cHNyACVvcmcuYXBhY2hlLnNwYXJrLnNxbC5UZXN0VURGcyQkYW5vbiQ1hQsS9jxAO/gCAAB4cHNyACRzY2FsYS5jb2xsZWN0aW9uLm11dGFibGUuQXJyYXlCdWZmZXIVOLBTg4KOcwIAA0kAC2luaXRpYWxTaXplSQAFc2l6ZTBbAAVhcnJheXQAE1 [...]
+ "nullable": true
+ }
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin
new file mode 100644
index 00000000000..aa1971d3112
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_udf.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/hint.json b/connector/connect/common/src/test/resources/query-tests/queries/hint.json
new file mode 100644
index 00000000000..38f3ff1ab7a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/hint.json
@@ -0,0 +1,15 @@
+{
+ "hint": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "name": "coalesce",
+ "parameters": [{
+ "literal": {
+ "integer": 100
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin
new file mode 100644
index 00000000000..8832794f792
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/hint.proto.bin
@@ -0,0 +1,3 @@
+�6
+$Z" struct<id:bigint,a:int,b:double>coalesce
+0d
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersect.json b/connector/connect/common/src/test/resources/query-tests/queries/intersect.json
new file mode 100644
index 00000000000..cd8167116c0
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersect.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_INTERSECT",
+ "isAll": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin
new file mode 100644
index 00000000000..71816354cda
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/intersect.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json
new file mode 100644
index 00000000000..9fd2a7a3727
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_INTERSECT",
+ "isAll": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin
new file mode 100644
index 00000000000..5e3325c73c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/intersectAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json
new file mode 100644
index 00000000000..b152642b547
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.json
@@ -0,0 +1,29 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinCondition": {
+ "unresolvedFunction": {
+ "functionName": "\u003d",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ }
+ },
+ "joinType": "JOIN_TYPE_LEFT_ANTI"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin
new file mode 100644
index 00000000000..a1464a72ea8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_condition.proto.bin
@@ -0,0 +1,5 @@
+*k
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
+=
+id
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json
new file mode 100644
index 00000000000..28949407af4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.json
@@ -0,0 +1,29 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinCondition": {
+ "unresolvedFunction": {
+ "functionName": "\u003d",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+ },
+ "joinType": "JOIN_TYPE_INNER"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin
new file mode 100644
index 00000000000..9d0a3ca5d08
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_condition.proto.bin
@@ -0,0 +1,5 @@
+*i
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
+=
+a
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json
new file mode 100644
index 00000000000..0308a128db3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.json
@@ -0,0 +1,15 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_INNER"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin
new file mode 100644
index 00000000000..9a269059bf7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_no_condition.proto.bin
@@ -0,0 +1,2 @@
+*T
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json
new file mode 100644
index 00000000000..9f9f1a0cf30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_INNER",
+ "usingColumns": ["id", "a"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin
new file mode 100644
index 00000000000..0e4192d7afc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_array.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json
new file mode 100644
index 00000000000..9f9f1a0cf30
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_INNER",
+ "usingColumns": ["id", "a"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin
new file mode 100644
index 00000000000..0e4192d7afc
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_multiple_col_seq.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json
new file mode 100644
index 00000000000..b5137f978a9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_INNER",
+ "usingColumns": ["id"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin
new file mode 100644
index 00000000000..0708749bd47
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_inner_using_single_col.proto.bin
@@ -0,0 +1,2 @@
+*X
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json
new file mode 100644
index 00000000000..4bd0b9ec2c7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_FULL_OUTER",
+ "usingColumns": ["id", "a"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin
new file mode 100644
index 00000000000..954128f8c1e
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_array.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json
new file mode 100644
index 00000000000..3d06f7ab31f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_RIGHT_OUTER",
+ "usingColumns": ["id", "a"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin
new file mode 100644
index 00000000000..5878e776d87
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_multiple_col_seq.proto.bin
@@ -0,0 +1,2 @@
+*[
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id*a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json
new file mode 100644
index 00000000000..b5b8b3a40f9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.json
@@ -0,0 +1,16 @@
+{
+ "join": {
+ "left": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "right": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "joinType": "JOIN_TYPE_LEFT_SEMI",
+ "usingColumns": ["id"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin
new file mode 100644
index 00000000000..0c72726a8c2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/join_using_single_col.proto.bin
@@ -0,0 +1,2 @@
+*X
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> *id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json
new file mode 100644
index 00000000000..292adb11b17
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.json
@@ -0,0 +1,19 @@
+{
+ "unpivot": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "ids": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "valueColumnName": "value"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin
new file mode 100644
index 00000000000..dc1a40f8d48
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_no_values.proto.bin
@@ -0,0 +1,4 @@
+�<
+$Z" struct<id:bigint,a:int,b:double>
+id
+a*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json
new file mode 100644
index 00000000000..79d494c3c5d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.json
@@ -0,0 +1,22 @@
+{
+ "unpivot": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "ids": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "values": {
+ "values": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }]
+ },
+ "valueColumnName": "value"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin
new file mode 100644
index 00000000000..1ac7cb290f8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/melt_values.proto.bin
@@ -0,0 +1,5 @@
+�>
+$Z" struct<id:bigint,a:int,b:double>
+a
+
+id*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/offset.json b/connector/connect/common/src/test/resources/query-tests/queries/offset.json
new file mode 100644
index 00000000000..e03d5072ea4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/offset.json
@@ -0,0 +1,10 @@
+{
+ "offset": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "offset": 1000
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin
new file mode 100644
index 00000000000..8c7dde8f92c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/offset.proto.bin
@@ -0,0 +1,2 @@
+j)
+$Z" struct<id:bigint,a:int,b:double>�
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json
new file mode 100644
index 00000000000..84cb594f6a1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.json
@@ -0,0 +1,35 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin
new file mode 100644
index 00000000000..4061563400c
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json
new file mode 100644
index 00000000000..41bf89273e2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.json
@@ -0,0 +1,35 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin
new file mode 100644
index 00000000000..e2f5ccbdd9a
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/orderBy_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition.json
new file mode 100644
index 00000000000..770b227b707
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition.json
@@ -0,0 +1,11 @@
+{
+ "repartition": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "numPartitions": 24,
+ "shuffle": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin
new file mode 100644
index 00000000000..03bb4787617
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition.proto.bin
@@ -0,0 +1,2 @@
+�*
+$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json
new file mode 100644
index 00000000000..deb4c618885
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.json
@@ -0,0 +1,30 @@
+{
+ "repartitionByExpression": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "partitionExprs": [{
+ "sortOrder": {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }
+ }, {
+ "sortOrder": {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_DESCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin
new file mode 100644
index 00000000000..531737b7548
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_expressions.proto.bin
@@ -0,0 +1,6 @@
+�E
+$Z" struct<id:bigint,a:int,b:double>
J
+
+aJ
+
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json
new file mode 100644
index 00000000000..eede8a4b1ac
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.json
@@ -0,0 +1,31 @@
+{
+ "repartitionByExpression": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "partitionExprs": [{
+ "sortOrder": {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }
+ }, {
+ "sortOrder": {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_DESCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }
+ }],
+ "numPartitions": 33
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin
new file mode 100644
index 00000000000..8139f2cb397
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartitionByRange_num_partitions_expressions.proto.bin
@@ -0,0 +1,6 @@
+�G
+$Z" struct<id:bigint,a:int,b:double>
J
+
+bJ
+
+id!
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json
new file mode 100644
index 00000000000..c91a30eb0e9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.json
@@ -0,0 +1,18 @@
+{
+ "repartitionByExpression": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "partitionExprs": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin
new file mode 100644
index 00000000000..c217e9d9d93
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_expressions.proto.bin
@@ -0,0 +1,4 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+id
+b
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json
new file mode 100644
index 00000000000..d70380d1228
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.json
@@ -0,0 +1,19 @@
+{
+ "repartitionByExpression": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "partitionExprs": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "numPartitions": 22
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin
new file mode 100644
index 00000000000..47b3ab9daf5
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/repartition_num_partitions_expressions.proto.bin
@@ -0,0 +1,4 @@
+�7
+$Z" struct<id:bigint,a:int,b:double>
+a
+id
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json
new file mode 100644
index 00000000000..3f4dbedac3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.json
@@ -0,0 +1,12 @@
+{
+ "sample": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "upperBound": 0.43,
+ "withReplacement": false,
+ "seed": "9890823"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin
new file mode 100644
index 00000000000..7754a5e213d
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sample_fraction_seed.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json
new file mode 100644
index 00000000000..5f9f6f21966
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.json
@@ -0,0 +1,12 @@
+{
+ "sample": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "upperBound": 0.23,
+ "withReplacement": true,
+ "seed": "898"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin
new file mode 100644
index 00000000000..2e1efe2e527
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sample_withReplacement_fraction_seed.proto.bin
@@ -0,0 +1,3 @@
+b4
+$Z" struct<id:bigint,a:int,b:double>q=
+ףp�? (�
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json
new file mode 100644
index 00000000000..b38c071d823
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.json
@@ -0,0 +1,18 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "expressionString": {
+ "expression": "a + 10 as x"
+ }
+ }, {
+ "expressionString": {
+ "expression": "id % 10 as grp"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin
new file mode 100644
index 00000000000..9f203955a8d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/selectExpr.proto.bin
@@ -0,0 +1,4 @@
+K
+$Z" struct<id:bigint,a:int,b:double>"
+a + 10 as x"
+id % 10 as grp
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json
new file mode 100644
index 00000000000..cde5f0721a3
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.json
@@ -0,0 +1,18 @@
+{
+ "project": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin
new file mode 100644
index 00000000000..507d3c15e61
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/select_strings.proto.bin
@@ -0,0 +1,4 @@
+5
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json
new file mode 100644
index 00000000000..08d98935f40
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.json
@@ -0,0 +1,27 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin
new file mode 100644
index 00000000000..5c076fa9d12
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json
new file mode 100644
index 00000000000..6b13151641d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.json
@@ -0,0 +1,27 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin
new file mode 100644
index 00000000000..572abd9c7a0
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sortWithinPartitions_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json
new file mode 100644
index 00000000000..08d98935f40
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.json
@@ -0,0 +1,27 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin
new file mode 100644
index 00000000000..5c076fa9d12
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sort_columns.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json
new file mode 100644
index 00000000000..16220ca0947
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.json
@@ -0,0 +1,27 @@
+{
+ "sort": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "order": [{
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }, {
+ "child": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ },
+ "direction": "SORT_DIRECTION_ASCENDING",
+ "nullOrdering": "SORT_NULLS_FIRST"
+ }],
+ "isGlobal": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin
new file mode 100644
index 00000000000..01a61b9c158
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/sort_strings.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/summary.json b/connector/connect/common/src/test/resources/query-tests/queries/summary.json
new file mode 100644
index 00000000000..48c336fbf7c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/summary.json
@@ -0,0 +1,10 @@
+{
+ "summary": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "statistics": ["mean", "min"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin
new file mode 100644
index 00000000000..69363710ef2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/summary.proto.bin
@@ -0,0 +1,2 @@
+�1
+$Z" struct<id:bigint,a:int,b:double>meanmin
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to.json b/connector/connect/common/src/test/resources/query-tests/queries/to.json
new file mode 100644
index 00000000000..e09913ff562
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/to.json
@@ -0,0 +1,28 @@
+{
+ "toSchema": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "schema": {
+ "struct": {
+ "fields": [{
+ "name": "b",
+ "dataType": {
+ "double": {
+ }
+ },
+ "nullable": true
+ }, {
+ "name": "id",
+ "dataType": {
+ "integer": {
+ }
+ },
+ "nullable": true
+ }]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin
new file mode 100644
index 00000000000..ce3c1509673
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/to.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toDF.json b/connector/connect/common/src/test/resources/query-tests/queries/toDF.json
new file mode 100644
index 00000000000..e753b7d0e3a
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/toDF.json
@@ -0,0 +1,10 @@
+{
+ "toDf": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "columnNames": ["x1", "x2", "x3"]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin
new file mode 100644
index 00000000000..b88bdf99169
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/toDF.proto.bin
@@ -0,0 +1,2 @@
+�2
+$Z" struct<id:bigint,a:int,b:double>x1x2x3
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/union.json b/connector/connect/common/src/test/resources/query-tests/queries/union.json
new file mode 100644
index 00000000000..170e0f09cf9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/union.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_UNION",
+ "isAll": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin
new file mode 100644
index 00000000000..7c4f869e44f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/union.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json
new file mode 100644
index 00000000000..170e0f09cf9
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.json
@@ -0,0 +1,16 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_UNION",
+ "isAll": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin
new file mode 100644
index 00000000000..7c4f869e44f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionAll.proto.bin
@@ -0,0 +1,2 @@
+2P
+$Z" struct<id:bigint,a:int,b:double>$Z" struct<id:bigint,a:int,b:double>
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json
new file mode 100644
index 00000000000..433ba22fa32
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.json
@@ -0,0 +1,18 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_UNION",
+ "isAll": true,
+ "byName": true,
+ "allowMissingColumns": false
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin
new file mode 100644
index 00000000000..e08dc6d9fdf
Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/unionByName.proto.bin differ
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json
new file mode 100644
index 00000000000..70e770c135f
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.json
@@ -0,0 +1,18 @@
+{
+ "setOp": {
+ "leftInput": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "rightInput": {
+ "localRelation": {
+ "schema": "struct\u003ca:int,id:bigint,payload:binary\u003e"
+ }
+ },
+ "setOpType": "SET_OP_TYPE_UNION",
+ "isAll": true,
+ "byName": true,
+ "allowMissingColumns": true
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin
new file mode 100644
index 00000000000..29e0896b6c6
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unionByName_allowMissingColumns.proto.bin
@@ -0,0 +1,2 @@
+2Z
+$Z" struct<id:bigint,a:int,b:double>*Z(&struct<a:int,id:bigint,payload:binary> (0
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json
new file mode 100644
index 00000000000..662328af9a7
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.json
@@ -0,0 +1,15 @@
+{
+ "unpivot": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "ids": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }],
+ "valueColumnName": "value"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin
new file mode 100644
index 00000000000..9f2600404f2
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_no_values.proto.bin
@@ -0,0 +1,3 @@
+�5
+$Z" struct<id:bigint,a:int,b:double>
+id*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json
new file mode 100644
index 00000000000..3557958fd29
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.json
@@ -0,0 +1,26 @@
+{
+ "unpivot": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "ids": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "a"
+ }
+ }],
+ "values": {
+ "values": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "b"
+ }
+ }]
+ },
+ "valueColumnName": "value"
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin
new file mode 100644
index 00000000000..9fb45c9c0c1
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/unpivot_values.proto.bin
@@ -0,0 +1,6 @@
+�E
+$Z" struct<id:bigint,a:int,b:double>
+id
+a
+
+b*value
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_column.json b/connector/connect/common/src/test/resources/query-tests/queries/where_column.json
new file mode 100644
index 00000000000..f8ec863648c
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_column.json
@@ -0,0 +1,23 @@
+{
+ "filter": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "condition": {
+ "unresolvedFunction": {
+ "functionName": "\u003d",
+ "arguments": [{
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ }, {
+ "literal": {
+ "long": "1"
+ }
+ }]
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin
new file mode 100644
index 00000000000..eae5db282ca
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_column.proto.bin
@@ -0,0 +1,5 @@
+";
+$Z" struct<id:bigint,a:int,b:double>
+=
+id
+8
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json
new file mode 100644
index 00000000000..c244ff0e891
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.json
@@ -0,0 +1,14 @@
+{
+ "filter": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "condition": {
+ "expressionString": {
+ "expression": "a + id \u003c 1000"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin
new file mode 100644
index 00000000000..63b44b32b63
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/where_expr.proto.bin
@@ -0,0 +1,3 @@
+"9
+$Z" struct<id:bigint,a:int,b:double>"
+
a + id < 1000
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json
new file mode 100644
index 00000000000..ba7d76aec99
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.json
@@ -0,0 +1,13 @@
+{
+ "withColumnsRenamed": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "renameColumnsMap": {
+ "b": "bravo",
+ "id": "nid"
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin
new file mode 100644
index 00000000000..38130a873d8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_java_map.proto.bin
@@ -0,0 +1,5 @@
+�=
+$Z" struct<id:bigint,a:int,b:double>
+
+bbravo
+idnid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json
new file mode 100644
index 00000000000..ee507379fa4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.json
@@ -0,0 +1,13 @@
+{
+ "withColumnsRenamed": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "renameColumnsMap": {
+ "a": "alpha",
+ "b": "beta"
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin
new file mode 100644
index 00000000000..5a8cfaa51ee
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_scala_map.proto.bin
@@ -0,0 +1,5 @@
+�=
+$Z" struct<id:bigint,a:int,b:double>
+
+aalpha
+bbeta
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json
new file mode 100644
index 00000000000..8d0bfc254eb
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.json
@@ -0,0 +1,12 @@
+{
+ "withColumnsRenamed": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "renameColumnsMap": {
+ "id": "nid"
+ }
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin
new file mode 100644
index 00000000000..5c60a64d3ad
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumnRenamed_single.proto.bin
@@ -0,0 +1,3 @@
+�1
+$Z" struct<id:bigint,a:int,b:double>
+idnid
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json
new file mode 100644
index 00000000000..5bf8b01d5c4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.json
@@ -0,0 +1,17 @@
+{
+ "withColumns": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "aliases": [{
+ "expr": {
+ "expressionString": {
+ "expression": "a + 100"
+ }
+ },
+ "name": ["z"]
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin
new file mode 100644
index 00000000000..f6693d6b8b8
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumn_single.proto.bin
@@ -0,0 +1,4 @@
+�8
+$Z" struct<id:bigint,a:int,b:double>
+"
+a + 100z
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json
new file mode 100644
index 00000000000..21e75d07aef
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.json
@@ -0,0 +1,24 @@
+{
+ "withColumns": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "aliases": [{
+ "expr": {
+ "literal": {
+ "string": "123"
+ }
+ },
+ "name": ["a"]
+ }, {
+ "expr": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "name": ["g"]
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin
new file mode 100644
index 00000000000..f71f6248959
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_java_map.proto.bin
@@ -0,0 +1,6 @@
+�A
+$Z" struct<id:bigint,a:int,b:double>
+
+j123a
+
+idg
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json
new file mode 100644
index 00000000000..aca9ce7db7d
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.json
@@ -0,0 +1,24 @@
+{
+ "withColumns": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "aliases": [{
+ "expr": {
+ "literal": {
+ "string": "redacted"
+ }
+ },
+ "name": ["b"]
+ }, {
+ "expr": {
+ "expressionString": {
+ "expression": "a + 100"
+ }
+ },
+ "name": ["z"]
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin
new file mode 100644
index 00000000000..52a22ed9cec
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withColumns_scala_map.proto.bin
@@ -0,0 +1,7 @@
+�K
+$Z" struct<id:bigint,a:int,b:double>
+
+
+jredactedb
+"
+a + 100z
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json
new file mode 100644
index 00000000000..0f28e7655a4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.json
@@ -0,0 +1,18 @@
+{
+ "withColumns": {
+ "input": {
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "aliases": [{
+ "expr": {
+ "unresolvedAttribute": {
+ "unparsedIdentifier": "id"
+ }
+ },
+ "name": ["id"],
+ "metadata": "{\"description\":\"unique identifier\"}"
+ }]
+ }
+}
\ No newline at end of file
diff --git a/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin
new file mode 100644
index 00000000000..e9aa65874e4
--- /dev/null
+++ b/connector/connect/common/src/test/resources/query-tests/queries/withMetadata.proto.bin
@@ -0,0 +1,4 @@
+�Y
+$Z" struct<id:bigint,a:int,b:double>1
+
+idid#{"description":"unique identifier"}
\ No newline at end of file
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index f91040a1009..3e301bb8823 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -25,8 +25,9 @@ import org.apache.spark.connect.proto.Expression.ExpressionString
import org.apache.spark.connect.proto.Join.JoinType
import org.apache.spark.connect.proto.SetOperation.SetOpType
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.connect.planner.DataTypeProtoConverter
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue
+import org.apache.spark.sql.connect.planner.SaveModeConverter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@@ -209,7 +210,7 @@ package object dsl {
mode
.map(SaveMode.valueOf(_))
- .map(DataTypeProtoConverter.toSaveModeProto(_))
+ .map(SaveModeConverter.toSaveModeProto(_))
.foreach(writeOp.setMode(_))
if (tableName.nonEmpty) {
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
index 38cce28ad6f..6ddaabb1b88 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connect.planner
import org.apache.spark.connect.proto
import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala
new file mode 100644
index 00000000000..41a9f97780c
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SaveModeConverter.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.planner
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.SaveMode
+
+/**
+ * Helper class for conversions between [[SaveMode]] and [[proto.WriteOperation.SaveMode]].
+ */
+object SaveModeConverter {
+ def toSaveMode(mode: proto.WriteOperation.SaveMode): SaveMode = {
+ mode match {
+ case proto.WriteOperation.SaveMode.SAVE_MODE_APPEND => SaveMode.Append
+ case proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE => SaveMode.Ignore
+ case proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE => SaveMode.Overwrite
+ case proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS => SaveMode.ErrorIfExists
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Cannot convert from WriteOperaton.SaveMode to Spark SaveMode: ${mode.getNumber}")
+ }
+ }
+
+ def toSaveModeProto(mode: SaveMode): proto.WriteOperation.SaveMode = {
+ mode match {
+ case SaveMode.Append => proto.WriteOperation.SaveMode.SAVE_MODE_APPEND
+ case SaveMode.Ignore => proto.WriteOperation.SaveMode.SAVE_MODE_IGNORE
+ case SaveMode.Overwrite => proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE
+ case SaveMode.ErrorIfExists => proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Cannot convert from SaveMode to WriteOperation.SaveMode: ${mode.name()}")
+ }
+ }
+}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 2924ee29e04..88bfbe8a8e0 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, L
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, Intersect, LocalRelation, LogicalPlan, Sample, Sort, SubqueryAlias, Union, Unpivot, UnresolvedHint}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils}
-import org.apache.spark.sql.connect.common.UdfPacket
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput, UdfPacket}
import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.{toCatalystExpression, toCatalystValue}
import org.apache.spark.sql.connect.plugin.SparkConnectPluginRegistry
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -48,11 +48,6 @@ import org.apache.spark.sql.internal.CatalogImpl
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-final case class InvalidPlanInput(
- private val message: String = "",
- private val cause: Throwable = None.orNull)
- extends Exception(message, cause)
-
final case class InvalidCommandInput(
private val message: String = "",
private val cause: Throwable = null)
@@ -1509,7 +1504,7 @@ class SparkConnectPlanner(val session: SparkSession) {
val w = dataset.write
if (writeOperation.getMode != proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED) {
- w.mode(DataTypeProtoConverter.toSaveMode(writeOperation.getMode))
+ w.mode(SaveModeConverter.toSaveMode(writeOperation.getMode))
}
if (writeOperation.getOptionsCount > 0) {
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
index 05aa2428140..0a5b4197b78 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala
@@ -43,8 +43,9 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter
import org.apache.spark.sql.connect.config.Connect.CONNECT_GRPC_BINDING_PORT
-import org.apache.spark.sql.connect.planner.{DataTypeProtoConverter, SparkConnectPlanner}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExplainMode, ExtendedMode, FormattedMode, SimpleMode}
/**
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 7470d539787..3e4a0f94ea2 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.test.SharedSparkSession
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
index e94b6d137cd..f94fc215ee3 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericInternalRow, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Distinct, LocalRelation, LogicalPlan}
+import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.dsl.MockRemoteSession
import org.apache.spark.sql.connect.dsl.commands._
import org.apache.spark.sql.connect.dsl.expressions._
@@ -642,7 +643,7 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
test("SaveMode conversion tests") {
assertThrows[IllegalArgumentException](
- DataTypeProtoConverter.toSaveMode(proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED))
+ SaveModeConverter.toSaveMode(proto.WriteOperation.SaveMode.SAVE_MODE_UNSPECIFIED))
val combinations = Seq(
(SaveMode.Append, proto.WriteOperation.SaveMode.SAVE_MODE_APPEND),
@@ -650,8 +651,8 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
(SaveMode.Overwrite, proto.WriteOperation.SaveMode.SAVE_MODE_OVERWRITE),
(SaveMode.ErrorIfExists, proto.WriteOperation.SaveMode.SAVE_MODE_ERROR_IF_EXISTS))
combinations.foreach { a =>
- assert(DataTypeProtoConverter.toSaveModeProto(a._1) == a._2)
- assert(DataTypeProtoConverter.toSaveMode(a._2) == a._1)
+ assert(SaveModeConverter.toSaveModeProto(a._1) == a._2)
+ assert(SaveModeConverter.toSaveMode(a._2) == a._1)
}
}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
index dfb7f5d0f90..7abe4a4d085 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/plugin/SparkConnectPluginRegistrySuite.scala
@@ -25,8 +25,9 @@ import org.apache.spark.connect.proto.Relation
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connect.common.InvalidPlanInput
import org.apache.spark.sql.connect.config.Connect
-import org.apache.spark.sql.connect.planner.{InvalidPlanInput, SparkConnectPlanner, SparkConnectPlanTest}
+import org.apache.spark.sql.connect.planner.{SparkConnectPlanner, SparkConnectPlanTest}
import org.apache.spark.sql.test.SharedSparkSession
class DummyPlugin extends RelationPlugin {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org