You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/03/04 10:06:46 UTC

[GitHub] [spark] carsonwang commented on a change in pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

carsonwang commented on a change in pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r261992278
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ##########
 @@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may still need to
-        // shuffle data when we have more than one children because data generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution = requiredChildDistributions.forall { dist =>
-          dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
-        }
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchangeExec, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange for now.
-            // Ideally, we can try to avoid this Exchange. However, when we reach here,
-            // there are at least two children operators (because if there is a single child
-            // and we can avoid Exchange, supportsCoordinator will be false and we
-            // will not reach here.). Although we can make two children have the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator of the Join to
-            // have a number of post-shuffle partitions that matches the number of partitions of
-            // Agg, we cannot say these two children are partitioned in the same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices [0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent the post-shuffle
-            // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
-            val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are returned.
-        children
-      }
-
-    withCoordinator
-  }
+  private def defaultNumPreShufflePartitions: Int =
 
 Review comment:
   If the user is familiar with the AE configuration, can he just set the maxNumPostShufflePartitions for the large join when he enables AE? If he set spark.sql.shuffle.partitions to 10K in non-AE mode. I expect he will set a higher value for maxNumPostShufflePartitions like 20K in AE mode. So AE can find a good reducer number between 1 and 20K and usually a better value than 10K.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org