You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/06/01 16:54:07 UTC
[spark] branch branch-3.3 updated: [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 000270a4ead [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
000270a4ead is described below
commit 000270a4ead61bb9d7333d05c55b02a2ec477a04
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Wed Jun 1 09:49:45 2022 -0700
[SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.
Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.
`V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).
No.
New UT.
Closes #36697 from pan3793/SPARK-39313.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Chao Sun <su...@apple.com>
---
.../catalyst/expressions/V2ExpressionUtils.scala | 23 +++---
.../expressions/V2ExpressionUtilsSuite.scala | 40 ++++++++++
.../sql/connector/catalog/InMemoryTable.scala | 11 ++-
.../v2/DistributionAndOrderingUtils.scala | 5 +-
.../datasources/v2/V2ScanPartitioning.scala | 4 +-
.../connector/KeyGroupedPartitioningSuite.scala | 92 +++++-----------------
6 files changed, 85 insertions(+), 90 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index 596d5d8b565..c252ea5ccfe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.Utils.sequenceToOption
/**
* A utility class that converts public connector expressions into Catalyst expressions.
@@ -54,19 +53,25 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
* Converts the array of input V2 [[V2SortOrder]] into their counterparts in catalyst.
*/
def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan): Seq[SortOrder] = {
- sequenceToOption(ordering.map(toCatalyst(_, query))).asInstanceOf[Option[Seq[SortOrder]]]
- .getOrElse(Seq.empty)
+ ordering.map(toCatalyst(_, query).asInstanceOf[SortOrder])
}
def toCatalyst(
+ expr: V2Expression,
+ query: LogicalPlan,
+ funCatalogOpt: Option[FunctionCatalog] = None): Expression =
+ toCatalystOpt(expr, query, funCatalogOpt)
+ .getOrElse(throw new AnalysisException(s"$expr is not currently supported"))
+
+ def toCatalystOpt(
expr: V2Expression,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
expr match {
case t: Transform =>
- toCatalystTransform(t, query, funCatalogOpt)
+ toCatalystTransformOpt(t, query, funCatalogOpt)
case SortValue(child, direction, nullOrdering) =>
- toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
+ toCatalystOpt(child, query, funCatalogOpt).map { catalystChild =>
SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
}
case ref: FieldReference =>
@@ -76,7 +81,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
}
}
- def toCatalystTransform(
+ def toCatalystTransformOpt(
trans: Transform,
query: LogicalPlan,
funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = trans match {
@@ -89,7 +94,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
// look up the V2 function.
val numBucketsRef = AttributeReference("numBuckets", IntegerType, nullable = false)()
funCatalogOpt.flatMap { catalog =>
- loadV2Function(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
+ loadV2FunctionOpt(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
TransformExpression(bound, resolvedRefs, Some(numBuckets))
}
}
@@ -99,7 +104,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
resolveRef[NamedExpression](r, query)
}
funCatalogOpt.flatMap { catalog =>
- loadV2Function(catalog, name, resolvedRefs).map { bound =>
+ loadV2FunctionOpt(catalog, name, resolvedRefs).map { bound =>
TransformExpression(bound, resolvedRefs)
}
}
@@ -107,7 +112,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
throw new AnalysisException(s"Transform $trans is not currently supported")
}
- private def loadV2Function(
+ private def loadV2FunctionOpt(
catalog: FunctionCatalog,
name: String,
args: Seq[Expression]): Option[BoundFunction] = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala
new file mode 100644
index 00000000000..d1c23d68555
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.types.StringType
+
+class V2ExpressionUtilsSuite extends SparkFunSuite {
+
+ test("SPARK-39313: toCatalystOrdering should fail if V2Expression can not be translated") {
+ val supportedV2Sort = SortValue(
+ FieldReference("a"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST)
+ val unsupportedV2Sort = supportedV2Sort.copy(
+ expression = ApplyTransform("v2Fun", FieldReference("a") :: Nil))
+ val exc = intercept[AnalysisException] {
+ V2ExpressionUtils.toCatalystOrdering(
+ Array(supportedV2Sort, unsupportedV2Sort),
+ LocalRelation.apply(AttributeReference("a", StringType)()))
+ }
+ assert(exc.message.contains("v2Fun(a) ASC NULLS FIRST is not currently supported"))
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index beed9111a30..7cc97bdf297 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -29,7 +29,7 @@ import org.scalatest.Assertions._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
-import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, Distribution, Distributions}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read._
@@ -291,9 +291,12 @@ class InMemoryTable(
}
override def outputPartitioning(): Partitioning = {
- InMemoryTable.this.distribution match {
- case cd: ClusteredDistribution => new KeyGroupedPartitioning(cd.clustering(), data.size)
- case _ => new UnknownPartitioning(data.size)
+ if (InMemoryTable.this.partitioning.nonEmpty) {
+ new KeyGroupedPartitioning(
+ InMemoryTable.this.partitioning.map(_.asInstanceOf[Expression]),
+ data.size)
+ } else {
+ new UnknownPartitioning(data.size)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index 275255c9a3d..0c0b5db14ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.connector.distributions._
import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.collection.Utils.sequenceToOption
object DistributionAndOrderingUtils {
@@ -34,9 +33,7 @@ object DistributionAndOrderingUtils {
val distribution = write.requiredDistribution match {
case d: OrderedDistribution => toCatalystOrdering(d.ordering(), query)
- case d: ClusteredDistribution =>
- sequenceToOption(d.clustering.map(e => toCatalyst(e, query)))
- .getOrElse(Seq.empty[Expression])
+ case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)).toSeq
case _: UnspecifiedDistribution => Seq.empty[Expression]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
index 8d2b3a8880c..9a5a7e6aab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.collection.Utils.sequenceToOption
*/
object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
- case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, _) =>
+ case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) =>
val funCatalogOpt = relation.catalog.flatMap {
case c: FunctionCatalog => Some(c)
case _ => None
@@ -40,7 +40,7 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
val catalystPartitioning = scan.outputPartitioning() match {
case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
- V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt)))
+ V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
case _: UnknownPartitioning => None
case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
"type: " + p.getClass.getSimpleName)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 834faedd1ce..bdbf309214f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -20,12 +20,11 @@ import java.util.Collections
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder => catalystSortOrder, TransformExpression}
+import org.apache.spark.sql.catalyst.expressions.TransformExpression
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.functions._
-import org.apache.spark.sql.connector.distributions.Distribution
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.Expressions._
@@ -83,8 +82,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val partitions: Array[Transform] = Array(Expressions.years("ts"))
// create a table with 3 partitions, partitioned by `years` transform
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+ createTable(table, schema, partitions)
sql(s"INSERT INTO testcat.ns.$table VALUES " +
s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -104,28 +102,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
physical.KeyGroupedPartitioning(catalystDistribution.clustering, partitionValues))
}
- test("non-clustered distribution: fallback to super.partitioning") {
- val partitions: Array[Transform] = Array(years("ts"))
- val ordering: Array[SortOrder] = Array(sort(FieldReference("ts"),
- SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
-
- createTable(table, schema, partitions, Distributions.ordered(ordering), ordering)
- sql(s"INSERT INTO testcat.ns.$table VALUES " +
- s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
- s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
- s"(2, 'ccc', CAST('2020-01-01' AS timestamp))")
-
- val df = sql(s"SELECT * FROM testcat.ns.$table")
- val catalystOrdering = Seq(catalystSortOrder(attr("ts"), Ascending))
- val catalystDistribution = physical.OrderedDistribution(catalystOrdering)
-
- checkQueryPlan(df, catalystDistribution, physical.UnknownPartitioning(0))
- }
-
test("non-clustered distribution: no partition") {
val partitions: Array[Transform] = Array(bucket(32, "ts"))
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+ createTable(table, schema, partitions)
val df = sql(s"SELECT * FROM testcat.ns.$table")
val distribution = physical.ClusteredDistribution(
@@ -136,8 +115,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
test("non-clustered distribution: single partition") {
val partitions: Array[Transform] = Array(bucket(32, "ts"))
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+ createTable(table, schema, partitions)
sql(s"INSERT INTO testcat.ns.$table VALUES (0, 'aaa', CAST('2020-01-01' AS timestamp))")
val df = sql(s"SELECT * FROM testcat.ns.$table")
@@ -152,9 +130,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val nonFunctionCatalog = spark.sessionState.catalogManager.catalog("testcat2")
.asInstanceOf[InMemoryTableCatalog]
val partitions: Array[Transform] = Array(bucket(32, "ts"))
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])),
- catalog = nonFunctionCatalog)
+ createTable(table, schema, partitions, catalog = nonFunctionCatalog)
sql(s"INSERT INTO testcat2.ns.$table VALUES " +
s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -174,8 +150,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
catalog.clearFunctions()
val partitions: Array[Transform] = Array(bucket(32, "ts"))
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+ createTable(table, schema, partitions)
sql(s"INSERT INTO testcat.ns.$table VALUES " +
s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -190,8 +165,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
test("non-clustered distribution: V2 bucketing disabled") {
withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
val partitions: Array[Transform] = Array(bucket(32, "ts"))
- createTable(table, schema, partitions,
- Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+ createTable(table, schema, partitions)
sql(s"INSERT INTO testcat.ns.$table VALUES " +
s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -239,11 +213,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
table: String,
schema: StructType,
partitions: Array[Transform],
- distribution: Distribution = Distributions.unspecified(),
- ordering: Array[expressions.SortOrder] = Array.empty,
catalog: InMemoryTableCatalog = catalog): Unit = {
catalog.createTable(Identifier.of(Array("ns"), table),
- schema, partitions, emptyProps, distribution, ordering, None)
+ schema, partitions, emptyProps, Distributions.unspecified(), Array.empty, None)
}
private val customers: String = "customers"
@@ -259,15 +231,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
private def testWithCustomersAndOrders(
customers_partitions: Array[Transform],
- customers_distribution: Distribution,
orders_partitions: Array[Transform],
- orders_distribution: Distribution,
expectedNumOfShuffleExecs: Int): Unit = {
- createTable(customers, customers_schema, customers_partitions, customers_distribution)
+ createTable(customers, customers_schema, customers_partitions)
sql(s"INSERT INTO testcat.ns.$customers VALUES " +
s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)")
- createTable(orders, orders_schema, orders_partitions, orders_distribution)
+ createTable(orders, orders_schema, orders_partitions)
sql(s"INSERT INTO testcat.ns.$orders VALUES " +
s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)")
@@ -297,11 +267,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val customers_partitions = Array(bucket(4, "customer_id"))
val orders_partitions = Array(bucket(4, "customer_id"))
- testWithCustomersAndOrders(customers_partitions,
- Distributions.clustered(customers_partitions.toArray),
- orders_partitions,
- Distributions.clustered(orders_partitions.toArray),
- 0)
+ testWithCustomersAndOrders(customers_partitions, orders_partitions, 0)
}
test("partitioned join: number of buckets mismatch should trigger shuffle") {
@@ -309,22 +275,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
val orders_partitions = Array(bucket(2, "customer_id"))
// should shuffle both sides when number of buckets are not the same
- testWithCustomersAndOrders(customers_partitions,
- Distributions.clustered(customers_partitions.toArray),
- orders_partitions,
- Distributions.clustered(orders_partitions.toArray),
- 2)
+ testWithCustomersAndOrders(customers_partitions, orders_partitions, 2)
}
test("partitioned join: only one side reports partitioning") {
val customers_partitions = Array(bucket(4, "customer_id"))
val orders_partitions = Array(bucket(2, "customer_id"))
- testWithCustomersAndOrders(customers_partitions,
- Distributions.clustered(customers_partitions.toArray),
- orders_partitions,
- Distributions.unspecified(),
- 2)
+ testWithCustomersAndOrders(customers_partitions, orders_partitions, 2)
}
private val items: String = "items"
@@ -342,8 +300,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
test("partitioned join: join with two partition keys and matching & sorted partitions") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
- createTable(items, items_schema, items_partitions,
- Distributions.clustered(items_partitions.toArray))
+ createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
@@ -352,8 +309,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
- createTable(purchases, purchases_schema, purchases_partitions,
- Distributions.clustered(purchases_partitions.toArray))
+ createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
@@ -375,8 +331,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
test("partitioned join: join with two partition keys and unsorted partitions") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
- createTable(items, items_schema, items_partitions,
- Distributions.clustered(items_partitions.toArray))
+ createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
@@ -385,8 +340,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp))")
val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
- createTable(purchases, purchases_schema, purchases_partitions,
- Distributions.clustered(purchases_partitions.toArray))
+ createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
@@ -408,8 +362,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
test("partitioned join: join with two partition keys and different # of partition keys") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
- createTable(items, items_schema, items_partitions,
- Distributions.clustered(items_partitions.toArray))
+ createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
@@ -417,8 +370,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
- createTable(purchases, purchases_schema, purchases_partitions,
- Distributions.clustered(purchases_partitions.toArray))
+ createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(2, 11.0, cast('2020-01-01' as timestamp))")
@@ -439,8 +391,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
val items_partitions = Array(identity("id"))
- createTable(items, items_schema, items_partitions,
- Distributions.clustered(items_partitions.toArray))
+ createTable(items, items_schema, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
@@ -449,8 +400,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
val purchases_partitions = Array(identity("item_id"))
- createTable(purchases, purchases_schema, purchases_partitions,
- Distributions.clustered(purchases_partitions.toArray))
+ createTable(purchases, purchases_schema, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org