You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/15 01:45:55 UTC

[GitHub] [spark] c21 commented on a change in pull request #32875: [SPARK-35703][SQL] Relax constraint for bucket join and remove HashClusteredDistribution

c21 commented on a change in pull request #32875:
URL: https://github.com/apache/spark/pull/32875#discussion_r769159792



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -352,3 +350,142 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
     case _ => false
   }
 }
+
+/**
+ * This is used in the scenario where an operator has multiple children (e.g., join) and one or more
+ * of which have their own requirement regarding whether its data can be considered as
+ * co-partitioned from others. This offers APIs for:
+ *
+ *   1. Comparing with specs from other children of the operator and check if they are compatible.
+ *      When two specs are compatible, we can say their data are co-partitioned, and Spark will
+ *      potentially able to eliminate shuffle if necessary.
+ *   1. Creating a partitioning that can be used to re-partition another child, so that to make it

Review comment:
       nit: `2. `

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -352,3 +350,142 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
     case _ => false
   }
 }
+
+/**
+ * This is used in the scenario where an operator has multiple children (e.g., join) and one or more
+ * of which have their own requirement regarding whether its data can be considered as
+ * co-partitioned from others. This offers APIs for:
+ *
+ *   1. Comparing with specs from other children of the operator and check if they are compatible.
+ *      When two specs are compatible, we can say their data are co-partitioned, and Spark will
+ *      potentially able to eliminate shuffle if necessary.
+ *   1. Creating a partitioning that can be used to re-partition another child, so that to make it
+ *      having a compatible partitioning as this node.
+ */
+trait ShuffleSpec {
+  /**
+   * Returns the number of partitions of this shuffle spec
+   */
+  def numPartitions: Int
+
+  /**
+   * Returns true iff this spec is compatible with the provided shuffle spec.
+   *
+   * A true return value means that the data partitioning from this spec can be seen as
+   * co-partitioned with the `other`, and therefore no shuffle is required when joining the two
+   * sides.
+   */
+  def isCompatibleWith(other: ShuffleSpec): Boolean
+
+  /**
+   * Whether this shuffle spec can be used to create partitionings for the other children.
+   */
+  def canCreatePartitioning: Boolean = false
+
+  /**
+   * Creates a partitioning that can be used to re-partitioned the other side with the given
+   * clustering expressions.
+   *
+   * This will only be called when:
+   *  - [[canCreatePartitioning]] returns true.
+   *  - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
+   */
+  def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    throw new UnsupportedOperationException("Operation unsupported for " +
+        s"${getClass.getCanonicalName}")
+}
+
+case object SinglePartitionShuffleSpec extends ShuffleSpec {
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = {
+    other.numPartitions == numPartitions
+  }
+
+  override def canCreatePartitioning: Boolean = true
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    SinglePartition
+
+  override def numPartitions: Int = 1
+}
+
+case class RangeShuffleSpec(
+    numPartitions: Int,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec => numPartitions == 1
+    case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith)
+    case _ => false
+  }
+}
+
+case class HashShuffleSpec(
+    partitioning: HashPartitioning,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+  lazy val hashKeyPositions =
+    createHashKeyPositions(distribution.clustering, partitioning.expressions)
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      partitioning.numPartitions == 1
+    case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution) =>
+      // we need to check:
+      //  1. both partitioning have the same number of partitions
+      //  2. both partitioning have the same number of expressions
+      //  3. each pair of expression from both has overlapping positions in their
+      //     corresponding distributions.
+      partitioning.numPartitions == otherPartitioning.numPartitions &&
+      partitioning.expressions.length == otherPartitioning.expressions.length && {
+        val otherHashKeyPositions = otherHashSpec.hashKeyPositions
+        hashKeyPositions.zip(otherHashKeyPositions).forall { case (left, right) =>
+          left.intersect(right).nonEmpty
+        }
+      }
+    case ShuffleSpecCollection(specs) =>
+      specs.exists(isCompatibleWith)
+    case _ =>
+      false
+  }
+
+  override def canCreatePartitioning: Boolean = true
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    val exprs = hashKeyPositions.map(v => clustering(v.head))
+    HashPartitioning(exprs, partitioning.numPartitions)
+  }
+
+  override def numPartitions: Int = partitioning.numPartitions
+
+  /**
+   * Returns a sequence where each element is a set of positions of the key in `hashKeys` to its
+   * positions in `requiredClusterKeys`. For instance, if `requiredClusterKeys` is [a, b, b] and
+   * `hashKeys` is [a, b], the result will be [(0), (1, 2)].
+   */
+  private def createHashKeyPositions(
+      requiredClusterKeys: Seq[Expression],
+      hashKeys: Seq[Expression]): Seq[mutable.BitSet] = {
+    val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
+    requiredClusterKeys.zipWithIndex.foreach { case (distKey, distKeyPos) =>
+      distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos)
+    }
+
+    hashKeys.map(k => distKeyToPos(k.canonicalized))
+  }
+}
+
+case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = {
+    specs.exists(_.isCompatibleWith(other))
+  }
+
+  override def canCreatePartitioning: Boolean =
+    specs.forall(_.canCreatePartitioning)
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
+    // as we only consider # of partitions as the cost now, it doesn't matter which one we choose
+    // since they should all have the same # of partitions.

Review comment:
       shall we make the assumption more explicitly by adding a `require()` inside this class? 

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -352,3 +350,142 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
     case _ => false
   }
 }
+
+/**
+ * This is used in the scenario where an operator has multiple children (e.g., join) and one or more
+ * of which have their own requirement regarding whether its data can be considered as
+ * co-partitioned from others. This offers APIs for:
+ *
+ *   1. Comparing with specs from other children of the operator and check if they are compatible.
+ *      When two specs are compatible, we can say their data are co-partitioned, and Spark will
+ *      potentially able to eliminate shuffle if necessary.

Review comment:
       nit: `potentially be able`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -68,63 +68,95 @@ case class EnsureRequirements(
     // Get the indexes of children which have specified distribution requirements and need to have
     // same number of partitions.
     val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
-      case (UnspecifiedDistribution, _) => false
-      case (_: BroadcastDistribution, _) => false
-      case _ => true
+      case (_: ClusteredDistribution, _) => true
+      case _ => false
     }.map(_._2)
 
-    val childrenNumPartitions =
-      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+    // If there are more than one children, we'll need to check partitioning & distribution of them
+    // and see if extra shuffles are necessary.
+    if (childrenIndexes.length > 1) {
+      val specs = childrenIndexes.map(i => {
+        val requiredDist = requiredChildDistributions(i)
+        assert(requiredDist.isInstanceOf[ClusteredDistribution],
+          s"Expected ClusteredDistribution but found ${requiredDist.getClass.getSimpleName}")
+        i -> children(i).outputPartitioning.createShuffleSpec(
+          requiredDist.asInstanceOf[ClusteredDistribution])
+      }).toMap
 
-    if (childrenNumPartitions.size > 1) {
-      // Get the number of partitions which is explicitly required by the distributions.
-      val requiredNumPartitions = {
-        val numPartitionsSet = childrenIndexes.flatMap {
-          index => requiredChildDistributions(index).requiredNumPartitions
-        }.toSet
-        assert(numPartitionsSet.size <= 1,
-          s"$requiredChildDistributions have incompatible requirements of the number of partitions")
-        numPartitionsSet.headOption
-      }
+      // Find out the shuffle spec that gives better parallelism.
+      //
+      // NOTE: this is not optimal for the case when there are more than 2 children. Consider:
+      //   (10, 10, 11)
+      // it's better to pick 10 in this case since we only need to shuffle one side - we'd need to
+      // shuffle two sides if we pick 11.
+      //
+      // However this should be sufficient for now since in Spark nodes with multiple children
+      // always have exactly 2 children.
 
-      // If there are non-shuffle children that satisfy the required distribution, we have
-      // some tradeoffs when picking the expected number of shuffle partitions:
-      // 1. We should avoid shuffling these children.
-      // 2. We should have a reasonable parallelism.
-      val nonShuffleChildrenNumPartitions =
-        childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
-          .map(_.outputPartitioning.numPartitions)
-      val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
-        if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
-          // Here we pick the max number of partitions among these non-shuffle children.
-          nonShuffleChildrenNumPartitions.max
+      // Whether we should consider `spark.sql.shuffle.partitions` and ensure enough parallelism
+      // during shuffle. To achieve a good trade-off between parallelism and shuffle cost, we only
+      // consider the minimum parallelism iff ALL children need to be re-shuffled.
+      //
+      // A child is considered to be re-shuffled iff:
+      //   1. It can't create partitioning by itself, i.e., `canCreatePartitioning` returns false.
+      //   2. It already has `ShuffleExchangeExec`.
+      //
+      // On the other hand, in scenarios such as:
+      //   HashPartitioning(5) <-> HashPartitioning(6)
+      // while `spark.sql.shuffle.partitions` is 10, we'll only re-shuffle the left side and make it
+      // HashPartitioning(6).
+      val canIgnoreMinPartitions = specs.exists(p =>
+        p._2.canCreatePartitioning && !children(p._1).isInstanceOf[ShuffleExchangeExec]

Review comment:
       Should we'd better use `ShuffleExchangeLike` or `ShuffleExchangeExec`? Will this work with customized columnar shuffle operator other than `ShuffleExchangeExec`?

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ShuffleSpecSuite.scala
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkFunSuite
+/* Implicit conversions */
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+
+class ShuffleSpecSuite extends SparkFunSuite {
+  protected def checkCompatible(
+      left: ShuffleSpec,
+      right: ShuffleSpec,
+      expected: Boolean): Unit = {
+    val actual = left.isCompatibleWith(right)
+    if (actual != expected) {
+      fail(
+        s"""
+           |== Left ShuffleSpec
+           |$left
+           |== Right ShuffleSpec
+           |$right
+           |== Does left is compatible with right? ==

Review comment:
       nit: `Is left compatible with right?`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -68,63 +68,95 @@ case class EnsureRequirements(
     // Get the indexes of children which have specified distribution requirements and need to have
     // same number of partitions.
     val childrenIndexes = requiredChildDistributions.zipWithIndex.filter {
-      case (UnspecifiedDistribution, _) => false
-      case (_: BroadcastDistribution, _) => false
-      case _ => true
+      case (_: ClusteredDistribution, _) => true
+      case _ => false
     }.map(_._2)
 
-    val childrenNumPartitions =
-      childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
+    // If there are more than one children, we'll need to check partitioning & distribution of them
+    // and see if extra shuffles are necessary.
+    if (childrenIndexes.length > 1) {
+      val specs = childrenIndexes.map(i => {
+        val requiredDist = requiredChildDistributions(i)
+        assert(requiredDist.isInstanceOf[ClusteredDistribution],
+          s"Expected ClusteredDistribution but found ${requiredDist.getClass.getSimpleName}")
+        i -> children(i).outputPartitioning.createShuffleSpec(
+          requiredDist.asInstanceOf[ClusteredDistribution])
+      }).toMap
 
-    if (childrenNumPartitions.size > 1) {
-      // Get the number of partitions which is explicitly required by the distributions.
-      val requiredNumPartitions = {
-        val numPartitionsSet = childrenIndexes.flatMap {
-          index => requiredChildDistributions(index).requiredNumPartitions
-        }.toSet
-        assert(numPartitionsSet.size <= 1,
-          s"$requiredChildDistributions have incompatible requirements of the number of partitions")
-        numPartitionsSet.headOption
-      }
+      // Find out the shuffle spec that gives better parallelism.
+      //
+      // NOTE: this is not optimal for the case when there are more than 2 children. Consider:
+      //   (10, 10, 11)
+      // it's better to pick 10 in this case since we only need to shuffle one side - we'd need to
+      // shuffle two sides if we pick 11.
+      //
+      // However this should be sufficient for now since in Spark nodes with multiple children
+      // always have exactly 2 children.
 
-      // If there are non-shuffle children that satisfy the required distribution, we have
-      // some tradeoffs when picking the expected number of shuffle partitions:
-      // 1. We should avoid shuffling these children.
-      // 2. We should have a reasonable parallelism.
-      val nonShuffleChildrenNumPartitions =
-        childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
-          .map(_.outputPartitioning.numPartitions)
-      val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
-        if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
-          // Here we pick the max number of partitions among these non-shuffle children.
-          nonShuffleChildrenNumPartitions.max
+      // Whether we should consider `spark.sql.shuffle.partitions` and ensure enough parallelism
+      // during shuffle. To achieve a good trade-off between parallelism and shuffle cost, we only
+      // consider the minimum parallelism iff ALL children need to be re-shuffled.
+      //
+      // A child is considered to be re-shuffled iff:
+      //   1. It can't create partitioning by itself, i.e., `canCreatePartitioning` returns false.
+      //   2. It already has `ShuffleExchangeExec`.
+      //
+      // On the other hand, in scenarios such as:
+      //   HashPartitioning(5) <-> HashPartitioning(6)
+      // while `spark.sql.shuffle.partitions` is 10, we'll only re-shuffle the left side and make it
+      // HashPartitioning(6).
+      val canIgnoreMinPartitions = specs.exists(p =>
+        p._2.canCreatePartitioning && !children(p._1).isInstanceOf[ShuffleExchangeExec]
+      )
+      // Choose all the specs that can be used to shuffle other children
+      val candidateSpecs = specs
+          .filter(_._2.canCreatePartitioning)
+          .filter(p => canIgnoreMinPartitions ||
+              children(p._1).outputPartitioning.numPartitions >= conf.defaultNumShufflePartitions)
+      val bestSpec = if (candidateSpecs.isEmpty) {
+        None
+      } else {
+        // When choosing specs, we should consider those children with no `Exchange` node

Review comment:
       nit: `Exchange` -> `ShuffleExchangeExec`? I can understand `Exchange` here, it would be better if we can be more precise here.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
##########
@@ -352,3 +350,142 @@ case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
     case _ => false
   }
 }
+
+/**
+ * This is used in the scenario where an operator has multiple children (e.g., join) and one or more
+ * of which have their own requirement regarding whether its data can be considered as
+ * co-partitioned from others. This offers APIs for:
+ *
+ *   1. Comparing with specs from other children of the operator and check if they are compatible.
+ *      When two specs are compatible, we can say their data are co-partitioned, and Spark will
+ *      potentially able to eliminate shuffle if necessary.
+ *   1. Creating a partitioning that can be used to re-partition another child, so that to make it
+ *      having a compatible partitioning as this node.
+ */
+trait ShuffleSpec {
+  /**
+   * Returns the number of partitions of this shuffle spec
+   */
+  def numPartitions: Int
+
+  /**
+   * Returns true iff this spec is compatible with the provided shuffle spec.
+   *
+   * A true return value means that the data partitioning from this spec can be seen as
+   * co-partitioned with the `other`, and therefore no shuffle is required when joining the two
+   * sides.
+   */
+  def isCompatibleWith(other: ShuffleSpec): Boolean
+
+  /**
+   * Whether this shuffle spec can be used to create partitionings for the other children.
+   */
+  def canCreatePartitioning: Boolean = false
+
+  /**
+   * Creates a partitioning that can be used to re-partitioned the other side with the given
+   * clustering expressions.
+   *
+   * This will only be called when:
+   *  - [[canCreatePartitioning]] returns true.
+   *  - [[isCompatibleWith]] returns false on the side where the `clustering` is from.
+   */
+  def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    throw new UnsupportedOperationException("Operation unsupported for " +
+        s"${getClass.getCanonicalName}")
+}
+
+case object SinglePartitionShuffleSpec extends ShuffleSpec {
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = {
+    other.numPartitions == numPartitions
+  }
+
+  override def canCreatePartitioning: Boolean = true
+
+  override def createPartitioning(clustering: Seq[Expression]): Partitioning =
+    SinglePartition
+
+  override def numPartitions: Int = 1
+}
+
+case class RangeShuffleSpec(
+    numPartitions: Int,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec => numPartitions == 1
+    case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith)
+    case _ => false
+  }
+}
+
+case class HashShuffleSpec(
+    partitioning: HashPartitioning,
+    distribution: ClusteredDistribution) extends ShuffleSpec {
+  lazy val hashKeyPositions =
+    createHashKeyPositions(distribution.clustering, partitioning.expressions)
+
+  override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
+    case SinglePartitionShuffleSpec =>
+      partitioning.numPartitions == 1
+    case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution) =>
+      // we need to check:
+      //  1. both partitioning have the same number of partitions
+      //  2. both partitioning have the same number of expressions
+      //  3. each pair of expression from both has overlapping positions in their
+      //     corresponding distributions.

Review comment:
       Sorry if I miss previous context, but curious why we don't need to check `this.distribution` and `otherDistribution` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org