You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/13 13:18:16 UTC

[spark] branch branch-3.2 updated: [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new a0f61cc  [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
a0f61cc is described below

commit a0f61ccfe460a13fd68787bf7af42060e6e09026
Author: Wenchen Fan <cl...@gmail.com>
AuthorDate: Tue Jul 13 21:17:13 2021 +0800

    [SPARK-36033][SQL][TEST] Validate partitioning requirements in TPCDS tests
    
    ### What changes were proposed in this pull request?
    
    Make sure all physical plans of TPCDS queries are valid (satisfy the partitioning requirement).
    
    ### Why are the changes needed?
    
    improve test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    Closes #33248 from cloud-fan/aqe2.
    
    Lead-authored-by: Wenchen Fan <cl...@gmail.com>
    Co-authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 583173b7cc03450e818a2315d31f830fb76febca)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/plans/physical/partitioning.scala |  1 +
 .../spark/sql/catalyst/DistributionSuite.scala     |  4 +-
 .../execution/exchange/ValidateRequirements.scala  | 64 ++++++++++++++++++++++
 .../org/apache/spark/sql/PlanStabilitySuite.scala  |  4 +-
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 0f8c788..fb7089c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -347,6 +347,7 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
   override val numPartitions: Int = 1
 
   override def satisfies0(required: Distribution): Boolean = required match {
+    case UnspecifiedDistribution => true
     case BroadcastDistribution(m) if m == mode => true
     case _ => false
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
index e94c120..02b25a2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala
@@ -42,7 +42,7 @@ class DistributionSuite extends SparkFunSuite {
   }
 
   test("UnspecifiedDistribution and AllTuples") {
-    // except `BroadcastPartitioning`, all other partitioning can satisfy UnspecifiedDistribution
+    // all partitioning can satisfy UnspecifiedDistribution
     checkSatisfied(
       UnknownPartitioning(-1),
       UnspecifiedDistribution,
@@ -71,7 +71,7 @@ class DistributionSuite extends SparkFunSuite {
     checkSatisfied(
       BroadcastPartitioning(IdentityBroadcastMode),
       UnspecifiedDistribution,
-      false)
+      true)
 
     // except `BroadcastPartitioning`, all other partitioning can satisfy AllTuples if they have
     // only one partition.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala
new file mode 100644
index 0000000..6964d9c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ValidateRequirements.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution._
+
+/**
+ * Validates that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
+ * of input data meets the
+ * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
+ * each operator, and so are the ordering requirements.
+ */
+object ValidateRequirements extends Logging {
+
+  def validate(plan: SparkPlan): Boolean = {
+    plan.children.forall(validate) && validateInternal(plan)
+  }
+
+  private def validateInternal(plan: SparkPlan): Boolean = {
+    val children: Seq[SparkPlan] = plan.children
+    val requiredChildDistributions: Seq[Distribution] = plan.requiredChildDistribution
+    val requiredChildOrderings: Seq[Seq[SortOrder]] = plan.requiredChildOrdering
+    assert(requiredChildDistributions.length == children.length)
+    assert(requiredChildOrderings.length == children.length)
+
+    // Verify partition number. For (hash) clustered distribution, the corresponding children must
+    // have the same number of partitions.
+    val numPartitions = requiredChildDistributions.zipWithIndex.collect {
+      case (_: ClusteredDistribution, i) => i
+      case (_: HashClusteredDistribution, i) => i
+    }.map(i => children(i).outputPartitioning.numPartitions)
+    if (numPartitions.length > 1 && !numPartitions.tail.forall(_ == numPartitions.head)) {
+      logDebug(s"ValidateRequirements failed: different partition num in\n$plan")
+      return false
+    }
+
+    children.zip(requiredChildDistributions.zip(requiredChildOrderings)).forall {
+      case (child, (distribution, ordering))
+          if !child.outputPartitioning.satisfies(distribution)
+            || !SortOrder.orderingSatisfies(child.outputOrdering, ordering) =>
+        logDebug(s"ValidateRequirements failed: $distribution, $ordering\n$plan")
+        false
+      case _ => true
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index 0df1872..69a8c72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeSet
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
-import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, ValidateRequirements}
 import org.apache.spark.sql.internal.SQLConf
 
 // scalastyle:off line.size.limit
@@ -252,6 +252,8 @@ trait PlanStabilitySuite extends TPCDSBase with DisableAdaptiveExecutionSuite {
     val plan = qe.executedPlan
     val explain = normalizeLocation(normalizeIds(qe.explainString(FormattedMode)))
 
+    assert(ValidateRequirements.validate(plan))
+
     if (regenerateGoldenFiles) {
       generateGoldenFile(plan, query + suffix, explain)
     } else {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org