You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/13 13:18:16 UTC
[spark] branch branch-3.2 updated: [SPARK-36033][SQL][TEST]
Validate partitioning requirements in TPCDS tests
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new a0f61cc [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
a0f61cc is described below
commit a0f61ccfe460a13fd68787bf7af42060e6e09026
Author: Wenchen Fan <cl...@gmail.com>
AuthorDate: Tue Jul 13 21:17:13 2021 +0800
[SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
### What changes were proposed in this pull request?
Make sure all physical plans of TPCDS queries are valid (satisfy the partitioning requirement).
### Why are the changes needed?
improve test coverage
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
Closes #33248 from cloud-fan/aqe2.
Lead-authored-by: Wenchen Fan <cl...@gmail.com>
Co-authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 583173b7cc03450e818a2315d31f830fb76febca)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/plans/physical/partitioning.scala | 1 +
.../spark/sql/catalyst/DistributionSuite.scala | 4 +-
.../execution/exchange/ValidateRequirements.scala | 64 ++++++++++++++++++++++
.../org/apache/spark/sql/PlanStabilitySuite.scala | 4 +-
4 files changed, 70 insertions(+), 3 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 0f8c788..fb7089c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -347,6 +347,7 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
override val numPartitions: Int = 1
override def satisfies0(required: Distribution): Boolean = required match {
+ case UnspecifiedDistribution => true
case BroadcastDistribution(m) if m == mode => true
case _ => false
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index e94c120..02b25a2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -42,7 +42,7 @@ class DistributionSuite extends SparkFunSuite {
}
test("UnspecifiedDistribution and AllTuples") {
- // except `BroadcastPartitioning`, all other partitioning can satisfy UnspecifiedDistribution
+ // all partitioning can satisfy UnspecifiedDistribution
checkSatisfied(
UnknownPartitioning(-1),
UnspecifiedDistribution,
@@ -71,7 +71,7 @@ class DistributionSuite extends SparkFunSuite {
checkSatisfied(
BroadcastPartitioning(IdentityBroadcastMode),
UnspecifiedDistribution,
- false)
+ true)
// except `BroadcastPartitioning`, all other partitioning can satisfy AllTuples if they have
// only one partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala
new file mode 100644
index 0000000..6964d9c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution._
+
+/**
+ * Validates that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
+ * of input data meets the
+ * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
+ * each operator, and so are the ordering requirements.
+ */
+object ValidateRequirements extends Logging {
+
+ def validate(plan: SparkPlan): Boolean = {
+ plan.children.forall(validate) && validateInternal(plan)
+ }
+
+ private def validateInternal(plan: SparkPlan): Boolean = {
+ val children: Seq[SparkPlan] = plan.children
+ val requiredChildDistributions: Seq[Distribution] = plan.requiredChildDistribution
+ val requiredChildOrderings: Seq[Seq[SortOrder]] = plan.requiredChildOrdering
+ assert(requiredChildDistributions.length == children.length)
+ assert(requiredChildOrderings.length == children.length)
+
+ // Verify partition number. For (hash) clustered distribution, the corresponding children must
+ // have the same number of partitions.
+ val numPartitions = requiredChildDistributions.zipWithIndex.collect {
+ case (_: ClusteredDistribution, i) => i
+ case (_: HashClusteredDistribution, i) => i
+ }.map(i => children(i).outputPartitioning.numPartitions)
+ if (numPartitions.length > 1 && !numPartitions.tail.forall(_ == numPartitions.head)) {
+ logDebug(s"ValidateRequirements failed: different partition num in\n$plan")
+ return false
+ }
+
+ children.zip(requiredChildDistributions.zip(requiredChildOrderings)).forall {
+ case (child, (distribution, ordering))
+ if !child.outputPartitioning.satisfies(distribution)
+ || !SortOrder.orderingSatisfies(child.outputOrdering, ordering) =>
+ logDebug(s"ValidateRequirements failed: $distribution, $ordering\n$plan")
+ false
+ case _ => true
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index 0df1872..69a8c72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
-import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ValidateRequirements}
import org.apache.spark.sql.internal.SQLConf
// scalastyle:off line.size.limit
@@ -252,6 +252,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
val plan = qe.executedPlan
val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode)))
+ assert(ValidateRequirements.validate(plan))
+
if (regenerateGoldenFiles) {
generateGoldenFile(plan, query + suffix, explain)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org