You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2022/06/01 16:54:07 UTC

[spark] branch branch-3.3 updated: [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 000270a4ead [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
000270a4ead is described below

commit 000270a4ead61bb9d7333d05c55b02a2ec477a04
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Wed Jun 1 09:49:45 2022 -0700

    [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
    
    After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict.
    
    Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq.
    
    `V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT).
    
    No.
    
    New UT.
    
    Closes #36697 from pan3793/SPARK-39313.
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../catalyst/expressions/V2ExpressionUtils.scala   | 23 +++---
 .../expressions/V2ExpressionUtilsSuite.scala       | 40 ++++++++++
 .../sql/connector/catalog/InMemoryTable.scala      | 11 ++-
 .../v2/DistributionAndOrderingUtils.scala          |  5 +-
 .../datasources/v2/V2ScanPartitioning.scala        |  4 +-
 .../connector/KeyGroupedPartitioningSuite.scala    | 92 +++++-----------------
 6 files changed, 85 insertions(+), 90 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index 596d5d8b565..c252ea5ccfe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.functions._
 import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.Utils.sequenceToOption
 
 /**
  * A utility class that converts public connector expressions into Catalyst expressions.
@@ -54,19 +53,25 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
    * Converts the array of input V2 [[V2SortOrder]] into their counterparts in catalyst.
    */
   def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan): Seq[SortOrder] = {
-    sequenceToOption(ordering.map(toCatalyst(_, query))).asInstanceOf[Option[Seq[SortOrder]]]
-      .getOrElse(Seq.empty)
+    ordering.map(toCatalyst(_, query).asInstanceOf[SortOrder])
   }
 
   def toCatalyst(
+      expr: V2Expression,
+      query: LogicalPlan,
+      funCatalogOpt: Option[FunctionCatalog] = None): Expression =
+    toCatalystOpt(expr, query, funCatalogOpt)
+        .getOrElse(throw new AnalysisException(s"$expr is not currently supported"))
+
+  def toCatalystOpt(
       expr: V2Expression,
       query: LogicalPlan,
       funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = {
     expr match {
       case t: Transform =>
-        toCatalystTransform(t, query, funCatalogOpt)
+        toCatalystTransformOpt(t, query, funCatalogOpt)
       case SortValue(child, direction, nullOrdering) =>
-        toCatalyst(child, query, funCatalogOpt).map { catalystChild =>
+        toCatalystOpt(child, query, funCatalogOpt).map { catalystChild =>
           SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
         }
       case ref: FieldReference =>
@@ -76,7 +81,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
     }
   }
 
-  def toCatalystTransform(
+  def toCatalystTransformOpt(
       trans: Transform,
       query: LogicalPlan,
       funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = trans match {
@@ -89,7 +94,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
       // look up the V2 function.
       val numBucketsRef = AttributeReference("numBuckets", IntegerType, nullable = false)()
       funCatalogOpt.flatMap { catalog =>
-        loadV2Function(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
+        loadV2FunctionOpt(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound =>
           TransformExpression(bound, resolvedRefs, Some(numBuckets))
         }
       }
@@ -99,7 +104,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
         resolveRef[NamedExpression](r, query)
       }
       funCatalogOpt.flatMap { catalog =>
-        loadV2Function(catalog, name, resolvedRefs).map { bound =>
+        loadV2FunctionOpt(catalog, name, resolvedRefs).map { bound =>
           TransformExpression(bound, resolvedRefs)
         }
       }
@@ -107,7 +112,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
       throw new AnalysisException(s"Transform $trans is not currently supported")
   }
 
-  private def loadV2Function(
+  private def loadV2FunctionOpt(
       catalog: FunctionCatalog,
       name: String,
       args: Seq[Expression]): Option[BoundFunction] = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala
new file mode 100644
index 00000000000..d1c23d68555
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.types.StringType
+
+class V2ExpressionUtilsSuite extends SparkFunSuite {
+
+  test("SPARK-39313: toCatalystOrdering should fail if V2Expression can not be translated") {
+    val supportedV2Sort = SortValue(
+      FieldReference("a"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST)
+    val unsupportedV2Sort = supportedV2Sort.copy(
+      expression = ApplyTransform("v2Fun", FieldReference("a") :: Nil))
+    val exc = intercept[AnalysisException] {
+      V2ExpressionUtils.toCatalystOrdering(
+        Array(supportedV2Sort, unsupportedV2Sort),
+        LocalRelation.apply(AttributeReference("a", StringType)()))
+    }
+    assert(exc.message.contains("v2Fun(a) ASC NULLS FIRST is not currently supported"))
+  }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index beed9111a30..7cc97bdf297 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -29,7 +29,7 @@ import org.scalatest.Assertions._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, DateTimeUtils}
-import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, Distribution, Distributions}
+import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
 import org.apache.spark.sql.connector.expressions._
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read._
@@ -291,9 +291,12 @@ class InMemoryTable(
     }
 
     override def outputPartitioning(): Partitioning = {
-      InMemoryTable.this.distribution match {
-        case cd: ClusteredDistribution => new KeyGroupedPartitioning(cd.clustering(), data.size)
-        case _ => new UnknownPartitioning(data.size)
+      if (InMemoryTable.this.partitioning.nonEmpty) {
+        new KeyGroupedPartitioning(
+          InMemoryTable.this.partitioning.map(_.asInstanceOf[Expression]),
+          data.size)
+      } else {
+        new UnknownPartitioning(data.size)
       }
     }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
index 275255c9a3d..0c0b5db14ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DistributionAndOrderingUtils.scala
@@ -24,7 +24,6 @@ import org.apache.spark.sql.connector.distributions._
 import org.apache.spark.sql.connector.write.{RequiresDistributionAndOrdering, Write}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.collection.Utils.sequenceToOption
 
 object DistributionAndOrderingUtils {
 
@@ -34,9 +33,7 @@ object DistributionAndOrderingUtils {
 
       val distribution = write.requiredDistribution match {
         case d: OrderedDistribution => toCatalystOrdering(d.ordering(), query)
-        case d: ClusteredDistribution =>
-          sequenceToOption(d.clustering.map(e => toCatalyst(e, query)))
-            .getOrElse(Seq.empty[Expression])
+        case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query)).toSeq
         case _: UnspecifiedDistribution => Seq.empty[Expression]
       }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
index 8d2b3a8880c..9a5a7e6aab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanPartitioning.scala
@@ -32,7 +32,7 @@ import org.apache.spark.util.collection.Utils.sequenceToOption
  */
 object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, _) =>
+    case d @ DataSourceV2ScanRelation(relation, scan: SupportsReportPartitioning, _, None) =>
       val funCatalogOpt = relation.catalog.flatMap {
         case c: FunctionCatalog => Some(c)
         case _ => None
@@ -40,7 +40,7 @@ object V2ScanPartitioning extends Rule[LogicalPlan] with SQLConfHelper {
 
       val catalystPartitioning = scan.outputPartitioning() match {
         case kgp: KeyGroupedPartitioning => sequenceToOption(kgp.keys().map(
-          V2ExpressionUtils.toCatalyst(_, relation, funCatalogOpt)))
+          V2ExpressionUtils.toCatalystOpt(_, relation, funCatalogOpt)))
         case _: UnknownPartitioning => None
         case p => throw new IllegalArgumentException("Unsupported data source V2 partitioning " +
             "type: " + p.getClass.getSimpleName)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 834faedd1ce..bdbf309214f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -20,12 +20,11 @@ import java.util.Collections
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder => catalystSortOrder, TransformExpression}
+import org.apache.spark.sql.catalyst.expressions.TransformExpression
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.connector.catalog.InMemoryTableCatalog
 import org.apache.spark.sql.connector.catalog.functions._
-import org.apache.spark.sql.connector.distributions.Distribution
 import org.apache.spark.sql.connector.distributions.Distributions
 import org.apache.spark.sql.connector.expressions._
 import org.apache.spark.sql.connector.expressions.Expressions._
@@ -83,8 +82,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     val partitions: Array[Transform] = Array(Expressions.years("ts"))
 
     // create a table with 3 partitions, partitioned by `years` transform
-    createTable(table, schema, partitions,
-      Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+    createTable(table, schema, partitions)
     sql(s"INSERT INTO testcat.ns.$table VALUES " +
         s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
         s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -104,28 +102,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
       physical.KeyGroupedPartitioning(catalystDistribution.clustering, partitionValues))
   }
 
-  test("non-clustered distribution: fallback to super.partitioning") {
-    val partitions: Array[Transform] = Array(years("ts"))
-    val ordering: Array[SortOrder] = Array(sort(FieldReference("ts"),
-      SortDirection.ASCENDING, NullOrdering.NULLS_FIRST))
-
-    createTable(table, schema, partitions, Distributions.ordered(ordering), ordering)
-    sql(s"INSERT INTO testcat.ns.$table VALUES " +
-        s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
-        s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
-        s"(2, 'ccc', CAST('2020-01-01' AS timestamp))")
-
-    val df = sql(s"SELECT * FROM testcat.ns.$table")
-    val catalystOrdering = Seq(catalystSortOrder(attr("ts"), Ascending))
-    val catalystDistribution = physical.OrderedDistribution(catalystOrdering)
-
-    checkQueryPlan(df, catalystDistribution, physical.UnknownPartitioning(0))
-  }
-
   test("non-clustered distribution: no partition") {
     val partitions: Array[Transform] = Array(bucket(32, "ts"))
-    createTable(table, schema, partitions,
-      Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+    createTable(table, schema, partitions)
 
     val df = sql(s"SELECT * FROM testcat.ns.$table")
     val distribution = physical.ClusteredDistribution(
@@ -136,8 +115,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
 
   test("non-clustered distribution: single partition") {
     val partitions: Array[Transform] = Array(bucket(32, "ts"))
-    createTable(table, schema, partitions,
-      Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+    createTable(table, schema, partitions)
     sql(s"INSERT INTO testcat.ns.$table VALUES (0, 'aaa', CAST('2020-01-01' AS timestamp))")
 
     val df = sql(s"SELECT * FROM testcat.ns.$table")
@@ -152,9 +130,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     val nonFunctionCatalog = spark.sessionState.catalogManager.catalog("testcat2")
         .asInstanceOf[InMemoryTableCatalog]
     val partitions: Array[Transform] = Array(bucket(32, "ts"))
-    createTable(table, schema, partitions,
-      Distributions.clustered(partitions.map(_.asInstanceOf[Expression])),
-      catalog = nonFunctionCatalog)
+    createTable(table, schema, partitions, catalog = nonFunctionCatalog)
     sql(s"INSERT INTO testcat2.ns.$table VALUES " +
         s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
         s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -174,8 +150,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     catalog.clearFunctions()
 
     val partitions: Array[Transform] = Array(bucket(32, "ts"))
-    createTable(table, schema, partitions,
-      Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+    createTable(table, schema, partitions)
     sql(s"INSERT INTO testcat.ns.$table VALUES " +
         s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
         s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -190,8 +165,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
   test("non-clustered distribution: V2 bucketing disabled") {
     withSQLConf(SQLConf.V2_BUCKETING_ENABLED.key -> "false") {
       val partitions: Array[Transform] = Array(bucket(32, "ts"))
-      createTable(table, schema, partitions,
-        Distributions.clustered(partitions.map(_.asInstanceOf[Expression])))
+      createTable(table, schema, partitions)
       sql(s"INSERT INTO testcat.ns.$table VALUES " +
           s"(0, 'aaa', CAST('2022-01-01' AS timestamp)), " +
           s"(1, 'bbb', CAST('2021-01-01' AS timestamp)), " +
@@ -239,11 +213,9 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
       table: String,
       schema: StructType,
       partitions: Array[Transform],
-      distribution: Distribution = Distributions.unspecified(),
-      ordering: Array[expressions.SortOrder] = Array.empty,
       catalog: InMemoryTableCatalog = catalog): Unit = {
     catalog.createTable(Identifier.of(Array("ns"), table),
-      schema, partitions, emptyProps, distribution, ordering, None)
+      schema, partitions, emptyProps, Distributions.unspecified(), Array.empty, None)
   }
 
   private val customers: String = "customers"
@@ -259,15 +231,13 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
 
   private def testWithCustomersAndOrders(
       customers_partitions: Array[Transform],
-      customers_distribution: Distribution,
       orders_partitions: Array[Transform],
-      orders_distribution: Distribution,
       expectedNumOfShuffleExecs: Int): Unit = {
-    createTable(customers, customers_schema, customers_partitions, customers_distribution)
+    createTable(customers, customers_schema, customers_partitions)
     sql(s"INSERT INTO testcat.ns.$customers VALUES " +
         s"('aaa', 10, 1), ('bbb', 20, 2), ('ccc', 30, 3)")
 
-    createTable(orders, orders_schema, orders_partitions, orders_distribution)
+    createTable(orders, orders_schema, orders_partitions)
     sql(s"INSERT INTO testcat.ns.$orders VALUES " +
         s"(100.0, 1), (200.0, 1), (150.0, 2), (250.0, 2), (350.0, 2), (400.50, 3)")
 
@@ -297,11 +267,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     val customers_partitions = Array(bucket(4, "customer_id"))
     val orders_partitions = Array(bucket(4, "customer_id"))
 
-    testWithCustomersAndOrders(customers_partitions,
-      Distributions.clustered(customers_partitions.toArray),
-      orders_partitions,
-      Distributions.clustered(orders_partitions.toArray),
-      0)
+    testWithCustomersAndOrders(customers_partitions, orders_partitions, 0)
   }
 
   test("partitioned join: number of buckets mismatch should trigger shuffle") {
@@ -309,22 +275,14 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
     val orders_partitions = Array(bucket(2, "customer_id"))
 
     // should shuffle both sides when number of buckets are not the same
-    testWithCustomersAndOrders(customers_partitions,
-      Distributions.clustered(customers_partitions.toArray),
-      orders_partitions,
-      Distributions.clustered(orders_partitions.toArray),
-      2)
+    testWithCustomersAndOrders(customers_partitions, orders_partitions, 2)
   }
 
   test("partitioned join: only one side reports partitioning") {
     val customers_partitions = Array(bucket(4, "customer_id"))
     val orders_partitions = Array(bucket(2, "customer_id"))
 
-    testWithCustomersAndOrders(customers_partitions,
-      Distributions.clustered(customers_partitions.toArray),
-      orders_partitions,
-      Distributions.unspecified(),
-      2)
+    testWithCustomersAndOrders(customers_partitions, orders_partitions, 2)
   }
 
   private val items: String = "items"
@@ -342,8 +300,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
 
   test("partitioned join: join with two partition keys and matching & sorted partitions") {
     val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
-    createTable(items, items_schema, items_partitions,
-      Distributions.clustered(items_partitions.toArray))
+    createTable(items, items_schema, items_partitions)
     sql(s"INSERT INTO testcat.ns.$items VALUES " +
         s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
         s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
@@ -352,8 +309,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
         s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
 
     val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
-    createTable(purchases, purchases_schema, purchases_partitions,
-      Distributions.clustered(purchases_partitions.toArray))
+    createTable(purchases, purchases_schema, purchases_partitions)
     sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
         s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
         s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
@@ -375,8 +331,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
 
   test("partitioned join: join with two partition keys and unsorted partitions") {
     val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
-    createTable(items, items_schema, items_partitions,
-      Distributions.clustered(items_partitions.toArray))
+    createTable(items, items_schema, items_partitions)
     sql(s"INSERT INTO testcat.ns.$items VALUES " +
         s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp)), " +
         s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
@@ -385,8 +340,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
         s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp))")
 
     val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
-    createTable(purchases, purchases_schema, purchases_partitions,
-      Distributions.clustered(purchases_partitions.toArray))
+    createTable(purchases, purchases_schema, purchases_partitions)
     sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
         s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
         s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
@@ -408,8 +362,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
 
   test("partitioned join: join with two partition keys and different # of partition keys") {
     val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
-    createTable(items, items_schema, items_partitions,
-      Distributions.clustered(items_partitions.toArray))
+    createTable(items, items_schema, items_partitions)
 
     sql(s"INSERT INTO testcat.ns.$items VALUES " +
         s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
@@ -417,8 +370,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
         s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
 
     val purchases_partitions = Array(bucket(8, "item_id"), days("time"))
-    createTable(purchases, purchases_schema, purchases_partitions,
-      Distributions.clustered(purchases_partitions.toArray))
+    createTable(purchases, purchases_schema, purchases_partitions)
     sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
         s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
         s"(2, 11.0, cast('2020-01-01' as timestamp))")
@@ -439,8 +391,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
         SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
         SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
       val items_partitions = Array(identity("id"))
-      createTable(items, items_schema, items_partitions,
-        Distributions.clustered(items_partitions.toArray))
+      createTable(items, items_schema, items_partitions)
       sql(s"INSERT INTO testcat.ns.$items VALUES " +
           s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
           s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
@@ -449,8 +400,7 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
           s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
 
       val purchases_partitions = Array(identity("item_id"))
-      createTable(purchases, purchases_schema, purchases_partitions,
-        Distributions.clustered(purchases_partitions.toArray))
+      createTable(purchases, purchases_schema, purchases_partitions)
       sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
           s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
           s"(1, 44.0, cast('2020-01-15' as timestamp)), " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org