You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/07/04 08:03:29 UTC
[spark] branch master updated: [SPARK-28177][SQL] Adjust post
shuffle partition number in adaptive execution
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cec6a32 [SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution
cec6a32 is described below
commit cec6a329044fa7d8a4f3da3871dbacac95cc38ed
Author: Carson Wang <ca...@intel.com>
AuthorDate: Thu Jul 4 16:03:04 2019 +0800
[SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution
## What changes were proposed in this pull request?
This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in #24706. This rule is used to adjust the post shuffle partitions based on the map output statistics.
## How was this patch tested?
Added ReduceNumShufflePartitionsSuite
Closes #24978 from carsonwang/reduceNumShufflePartitions.
Authored-by: Carson Wang <ca...@intel.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 33 ++-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +
.../sql/execution/adaptive/QueryStageExec.scala | 14 +-
.../adaptive/ReduceNumShufflePartitions.scala | 200 +++++++++++++++
.../execution/exchange/EnsureRequirements.scala | 120 +--------
.../execution/exchange/ExchangeCoordinator.scala | 277 ---------------------
.../execution/exchange/ShuffleExchangeExec.scala | 68 +----
.../sql/execution/streaming/StreamExecution.scala | 2 +-
.../sql/streaming/StreamingQueryManager.scala | 4 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 2 +-
.../apache/spark/sql/execution/PlannerSuite.scala | 11 +-
...scala => ReduceNumShufflePartitionsSuite.scala} | 256 +++++++++++++------
.../adaptive/AdaptiveQueryExecSuite.scala | 25 ++
13 files changed, 465 insertions(+), 549 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e4c58ef..af67632 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -311,16 +311,30 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
+ buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
+ .doc("When true and adaptive execution is enabled, this enables reducing the number of " +
+ "post-shuffle partitions based on map output statistics.")
+ .booleanConf
+ .createWithDefault(true)
+
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
- .internal()
- .doc("The advisory minimal number of post-shuffle partitions provided to " +
- "ExchangeCoordinator. This setting is used in our test to make sure we " +
- "have enough parallelism to expose issues that will not be exposed with a " +
- "single partition. When the value is a non-positive value, this setting will " +
- "not be provided to ExchangeCoordinator.")
+ .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
.intConf
- .createWithDefault(-1)
+ .checkValue(_ > 0, "The minimum shuffle partition number " +
+ "must be a positive integer.")
+ .createWithDefault(1)
+
+ val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
+ buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
+ .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
+ "This is used as the initial number of pre-shuffle partitions. By default it equals to " +
+ "spark.sql.shuffle.partitions")
+ .intConf
+ .checkValue(_ > 0, "The maximum shuffle partition number " +
+ "must be a positive integer.")
+ .createOptional
val SUBEXPRESSION_ELIMINATION_ENABLED =
buildConf("spark.sql.subexpressionElimination.enabled")
@@ -1939,9 +1953,14 @@ class SQLConf extends Serializable with Logging {
def adaptiveExecutionEnabled: Boolean =
getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)
+ def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+
def minNumPostShufflePartitions: Int =
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
+ def maxNumPostShufflePartitions: Int =
+ getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
+
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 606fbd8..0708878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
+import org.apache.spark.sql.execution.adaptive.rule.ReduceNumShufflePartitions
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
import org.apache.spark.sql.internal.SQLConf
@@ -82,6 +83,7 @@ case class AdaptiveSparkPlanExec(
// A list of physical optimizer rules to be applied to a new stage before its execution. These
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+ ReduceNumShufflePartitions(conf),
CollapseCodegenStages(conf)
)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 98cb7d0..c803ca3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -161,9 +161,21 @@ case class BroadcastQueryStageExec(
}
}
+object ShuffleQueryStageExec {
+ /**
+ * Returns true if the plan is a [[ShuffleQueryStageExec]] or a reused [[ShuffleQueryStageExec]].
+ */
+ def isShuffleQueryStageExec(plan: SparkPlan): Boolean = plan match {
+ case r: ReusedQueryStageExec => isShuffleQueryStageExec(r.plan)
+ case _: ShuffleQueryStageExec => true
+ case _ => false
+ }
+}
+
object BroadcastQueryStageExec {
/**
- * Returns if the plan is a [[BroadcastQueryStageExec]] or a reused [[BroadcastQueryStageExec]].
+ * Returns true if the plan is a [[BroadcastQueryStageExec]] or a reused
+ * [[BroadcastQueryStageExec]].
*/
def isBroadcastQueryStageExec(plan: SparkPlan): Boolean = plan match {
case r: ReusedQueryStageExec => isBroadcastQueryStageExec(r.plan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
new file mode 100644
index 0000000..d93eb76
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive.rule
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A rule to adjust the post shuffle partitions based on the map output statistics.
+ *
+ * The strategy used to determine the number of post-shuffle partitions is described as follows.
+ * To determine the number of post-shuffle partitions, we have a target input size for a
+ * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do
+ * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single
+ * post-shuffle partition until adding another pre-shuffle partition would cause the size of a
+ * post-shuffle partition to be greater than the target size.
+ *
+ * For example, we have two stages with the following pre-shuffle partition size statistics:
+ * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
+ * stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
+ * assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
+ * which are:
+ * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
+ * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
+ * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
+ * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
+ */
+case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.reducePostShufflePartitionsEnabled) {
+ return plan
+ }
+ if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) {
+ // If not all leaf nodes are query stages, it's not safe to reduce the number of
+ // shuffle partitions, because we may break the assumption that all children of a spark plan
+ // have same number of output partitions.
+ plan
+ } else {
+ val shuffleStages = plan.collect {
+ case stage: ShuffleQueryStageExec => stage
+ case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+ }
+ val shuffleMetrics = shuffleStages.map { stage =>
+ val metricsFuture = stage.mapOutputStatisticsFuture
+ assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
+ ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
+ }
+
+ // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
+ // we should skip it when calculating the `partitionStartIndices`.
+ val validMetrics = shuffleMetrics.filter(_ != null)
+ // We may get different pre-shuffle partition number if user calls repartition manually.
+ // We don't reduce shuffle partition number in that case.
+ val distinctNumPreShufflePartitions =
+ validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
+
+ if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
+ val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
+ // This transformation adds new nodes, so we must use `transformUp` here.
+ plan.transformUp {
+ // even for shuffle exchange whose input RDD has 0 partition, we should still update its
+ // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
+ // number of output partitions.
+ case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) =>
+ CoalescedShuffleReaderExec(stage, partitionStartIndices)
+ }
+ } else {
+ plan
+ }
+ }
+ }
+
+ /**
+ * Estimates partition start indices for post-shuffle partitions based on
+ * mapOutputStatistics provided by all pre-shuffle stages.
+ */
+ // visible for testing.
+ private[sql] def estimatePartitionStartIndices(
+ mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
+ val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
+ val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+ // If minNumPostShufflePartitions is defined, it is possible that we need to use a
+ // value less than advisoryTargetPostShuffleInputSize as the target input size of
+ // a post shuffle task.
+ val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
+ // The max at here is to make sure that when we have an empty table, we
+ // only have a single post-shuffle partition.
+ // There is no particular reason that we pick 16. We just need a number to
+ // prevent maxPostShuffleInputSize from being set to 0.
+ val maxPostShuffleInputSize = math.max(
+ math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16)
+ val targetPostShuffleInputSize =
+ math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
+
+ logInfo(
+ s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
+ s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
+
+ // Make sure we do get the same number of pre-shuffle partitions for those stages.
+ val distinctNumPreShufflePartitions =
+ mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
+ // The reason that we are expecting a single value of the number of pre-shuffle partitions
+ // is that when we add Exchanges, we set the number of pre-shuffle partitions
+ // (i.e. map output partitions) using a static setting, which is the value of
+ // spark.sql.shuffle.partitions. Even if two input RDDs are having different
+ // number of partitions, they will have the same number of pre-shuffle partitions
+ // (i.e. map output partitions).
+ assert(
+ distinctNumPreShufflePartitions.length == 1,
+ "There should be only one distinct value of the number pre-shuffle partitions " +
+ "among registered Exchange operator.")
+ val numPreShufflePartitions = distinctNumPreShufflePartitions.head
+
+ val partitionStartIndices = ArrayBuffer[Int]()
+ // The first element of partitionStartIndices is always 0.
+ partitionStartIndices += 0
+
+ var postShuffleInputSize = 0L
+
+ var i = 0
+ while (i < numPreShufflePartitions) {
+ // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
+ // Then, we add the total size to postShuffleInputSize.
+ var nextShuffleInputSize = 0L
+ var j = 0
+ while (j < mapOutputStatistics.length) {
+ nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
+ j += 1
+ }
+
+ // If including the nextShuffleInputSize would exceed the target partition size, then start a
+ // new partition.
+ if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
+ partitionStartIndices += i
+ // reset postShuffleInputSize.
+ postShuffleInputSize = nextShuffleInputSize
+ } else {
+ postShuffleInputSize += nextShuffleInputSize
+ }
+
+ i += 1
+ }
+
+ partitionStartIndices.toArray
+ }
+}
+
+case class CoalescedShuffleReaderExec(
+ child: QueryStageExec,
+ partitionStartIndices: Array[Int]) extends UnaryExecNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override def doCanonicalize(): SparkPlan = child.canonicalized
+
+ override def outputPartitioning: Partitioning = {
+ UnknownPartitioning(partitionStartIndices.length)
+ }
+
+ private var cachedShuffleRDD: ShuffledRowRDD = null
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ if (cachedShuffleRDD == null) {
+ cachedShuffleRDD = child match {
+ case stage: ShuffleQueryStageExec =>
+ stage.plan.createShuffledRDD(Some(partitionStartIndices))
+ case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+ stage.plan.createShuffledRDD(Some(partitionStartIndices))
+ }
+ }
+ cachedShuffleRDD
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index d2d5011..b7196d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -36,107 +36,12 @@ import org.apache.spark.sql.internal.SQLConf
* the input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
- private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
- private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize
-
- private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
- private def minNumPostShufflePartitions: Option[Int] = {
- val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
- if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
- }
-
- /**
- * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
- * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
- */
- private def withExchangeCoordinator(
- children: Seq[SparkPlan],
- requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
- val supportsCoordinator =
- if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
- // Right now, ExchangeCoordinator only support HashPartitionings.
- children.forall {
- case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
- case child =>
- child.outputPartitioning match {
- case hash: HashPartitioning => true
- case collection: PartitioningCollection =>
- collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
- case _ => false
- }
- }
- } else {
- // In this case, although we do not have Exchange operators, we may still need to
- // shuffle data when we have more than one children because data generated by
- // these children may not be partitioned in the same way.
- // Please see the comment in withCoordinator for more details.
- val supportsDistribution = requiredChildDistributions.forall { dist =>
- dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
- }
- children.length > 1 && supportsDistribution
- }
-
- val withCoordinator =
- if (adaptiveExecutionEnabled && supportsCoordinator) {
- val coordinator =
- new ExchangeCoordinator(
- targetPostShuffleInputSize,
- minNumPostShufflePartitions)
- children.zip(requiredChildDistributions).map {
- case (e: ShuffleExchangeExec, _) =>
- // This child is an Exchange, we need to add the coordinator.
- e.copy(coordinator = Some(coordinator))
- case (child, distribution) =>
- // If this child is not an Exchange, we need to add an Exchange for now.
- // Ideally, we can try to avoid this Exchange. However, when we reach here,
- // there are at least two children operators (because if there is a single child
- // and we can avoid Exchange, supportsCoordinator will be false and we
- // will not reach here.). Although we can make two children have the same number of
- // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
- // For example, let's say we have the following plan
- // Join
- // / \
- // Agg Exchange
- // / \
- // Exchange t2
- // /
- // t1
- // In this case, because a post-shuffle partition can include multiple pre-shuffle
- // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
- // after shuffle. So, even we can use the child Exchange operator of the Join to
- // have a number of post-shuffle partitions that matches the number of partitions of
- // Agg, we cannot say these two children are partitioned in the same way.
- // Here is another case
- // Join
- // / \
- // Agg1 Agg2
- // / \
- // Exchange1 Exchange2
- // / \
- // t1 t2
- // In this case, two Aggs shuffle data with the same column of the join condition.
- // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
- // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
- // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
- // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
- // pre-shuffle partitions by using another partitionStartIndices [0, 4].
- // So, Agg1 and Agg2 are actually not co-partitioned.
- //
- // It will be great to introduce a new Partitioning to represent the post-shuffle
- // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
- val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions)
- assert(targetPartitioning.isInstanceOf[HashPartitioning])
- ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
- }
- } else {
- // If we do not need ExchangeCoordinator, the original children are returned.
- children
- }
-
- withCoordinator
- }
+ private def defaultNumPreShufflePartitions: Int =
+ if (conf.runtimeReoptimizationEnabled) {
+ conf.maxNumPostShufflePartitions
+ } else {
+ conf.numShufflePartitions
+ }
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
@@ -189,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
child match {
// If child is an exchange, we replace it with a new one having defaultPartitioning.
- case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
+ case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c)
case _ => ShuffleExchangeExec(defaultPartitioning, child)
}
}
@@ -198,15 +103,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}
}
- // Now, we need to add ExchangeCoordinator if necessary.
- // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
- // However, with the way that we plan the query, we do not have a place where we have a
- // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
- // at here for now.
- // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
- // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
- children = withExchangeCoordinator(children, requiredChildDistributions)
-
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
@@ -295,7 +191,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
// TODO: remove this after we create a physical operator for `RepartitionByExpression`.
- case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+ case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) =>
child.outputPartitioning match {
case lower: HashPartitioning if upper.semanticEquals(lower) => child
case _ => operator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
deleted file mode 100644
index c99bf45..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.exchange
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction}
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
-
-/**
- * A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
- * Right now, the work of this coordinator is to determine the number of post-shuffle partitions
- * for a stage that needs to fetch shuffle data from one or multiple stages.
- *
- * A coordinator is constructed with three parameters, `numExchanges`,
- * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
- * - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be
- * registered to this coordinator. So, when we start to do any actual work, we have a way to
- * make sure that we have got expected number of [[ShuffleExchangeExec]]s.
- * - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
- * input data size. With this parameter, we can estimate the number of post-shuffle partitions.
- * This parameter is configured through
- * `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
- * - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
- * will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
- * partitions.
- *
- * The workflow of this coordinator is described as follows:
- * - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
- * if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
- * This happens in the `doPrepare` method.
- * - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
- * coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
- * [[ShuffledRowRDD]].
- * If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
- * will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
- * - If this coordinator has not made the decision on how to shuffle data, it will ask those
- * registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
- * size statistics of pre-shuffle partitions, this coordinator will determine the number of
- * post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
- * to a single post-shuffle partition whenever necessary.
- * - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
- * [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this
- * coordinator can lookup the corresponding [[RDD]].
- *
- * The strategy used to determine the number of post-shuffle partitions is described as follows.
- * To determine the number of post-shuffle partitions, we have a target input size for a
- * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
- * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics
- * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
- * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
- * greater than the target size.
- *
- * For example, we have two stages with the following pre-shuffle partition size statistics:
- * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
- * stage 2: [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
- * assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
- * which are:
- * - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
- * - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
- * - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
- * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
- */
-class ExchangeCoordinator(
- advisoryTargetPostShuffleInputSize: Long,
- minNumPostShufflePartitions: Option[Int] = None)
- extends Logging {
-
- // The registered Exchange operators.
- private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
-
- // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the
- // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is
- // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails
- // in `doEstimationIfNecessary`.
- private[this] lazy val numExchanges = exchanges.size
-
- // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
- private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
- new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
-
- // A boolean that indicates if this coordinator has made decision on how to shuffle data.
- // This variable will only be updated by doEstimationIfNecessary, which is protected by
- // synchronized.
- @volatile private[this] var estimated: Boolean = false
-
- /**
- * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed
- * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
- */
- @GuardedBy("this")
- def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
- exchanges += exchange
- }
-
- def isEstimated: Boolean = estimated
-
- /**
- * Estimates partition start indices for post-shuffle partitions based on
- * mapOutputStatistics provided by all pre-shuffle stages.
- */
- def estimatePartitionStartIndices(
- mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
- // If minNumPostShufflePartitions is defined, it is possible that we need to use a
- // value less than advisoryTargetPostShuffleInputSize as the target input size of
- // a post shuffle task.
- val targetPostShuffleInputSize = minNumPostShufflePartitions match {
- case Some(numPartitions) =>
- val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
- // The max at here is to make sure that when we have an empty table, we
- // only have a single post-shuffle partition.
- // There is no particular reason that we pick 16. We just need a number to
- // prevent maxPostShuffleInputSize from being set to 0.
- val maxPostShuffleInputSize =
- math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
- math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
-
- case None => advisoryTargetPostShuffleInputSize
- }
-
- logInfo(
- s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
- s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
-
- // Make sure we do get the same number of pre-shuffle partitions for those stages.
- val distinctNumPreShufflePartitions =
- mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
- // The reason that we are expecting a single value of the number of pre-shuffle partitions
- // is that when we add Exchanges, we set the number of pre-shuffle partitions
- // (i.e. map output partitions) using a static setting, which is the value of
- // spark.sql.shuffle.partitions. Even if two input RDDs are having different
- // number of partitions, they will have the same number of pre-shuffle partitions
- // (i.e. map output partitions).
- assert(
- distinctNumPreShufflePartitions.length == 1,
- "There should be only one distinct value of the number pre-shuffle partitions " +
- "among registered Exchange operator.")
- val numPreShufflePartitions = distinctNumPreShufflePartitions.head
-
- val partitionStartIndices = ArrayBuffer[Int]()
- // The first element of partitionStartIndices is always 0.
- partitionStartIndices += 0
-
- var postShuffleInputSize = 0L
-
- var i = 0
- while (i < numPreShufflePartitions) {
- // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
- // Then, we add the total size to postShuffleInputSize.
- var nextShuffleInputSize = 0L
- var j = 0
- while (j < mapOutputStatistics.length) {
- nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
- j += 1
- }
-
- // If including the nextShuffleInputSize would exceed the target partition size, then start a
- // new partition.
- if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
- partitionStartIndices += i
- // reset postShuffleInputSize.
- postShuffleInputSize = nextShuffleInputSize
- } else postShuffleInputSize += nextShuffleInputSize
-
- i += 1
- }
-
- partitionStartIndices.toArray
- }
-
- @GuardedBy("this")
- private def doEstimationIfNecessary(): Unit = synchronized {
- // It is unlikely that this method will be called from multiple threads
- // (when multiple threads trigger the execution of THIS physical)
- // because in common use cases, we will create new physical plan after
- // users apply operations (e.g. projection) to an existing DataFrame.
- // However, if it happens, we have synchronized to make sure only one
- // thread will trigger the job submission.
- if (!estimated) {
- // Make sure we have the expected number of registered Exchange operators.
- assert(exchanges.length == numExchanges)
-
- val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
-
- // Submit all map stages
- val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
- val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]()
- var i = 0
- while (i < numExchanges) {
- val exchange = exchanges(i)
- val shuffleDependency = exchange.shuffleDependency
- shuffleDependencies += shuffleDependency
- if (shuffleDependency.rdd.partitions.length != 0) {
- // submitMapStage does not accept RDD with 0 partition.
- // So, we will not submit this dependency.
- submittedStageFutures +=
- exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency)
- }
- i += 1
- }
-
- // Wait for the finishes of those submitted map stages.
- val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length)
- var j = 0
- while (j < submittedStageFutures.length) {
- // This call is a blocking call. If the stage has not finished, we will wait at here.
- mapOutputStatistics(j) = submittedStageFutures(j).get()
- j += 1
- }
-
- // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
- // a stage when the number of partitions of this dependency is 0.
- assert(mapOutputStatistics.length <= numExchanges)
-
- // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
- // number of post-shuffle partitions.
- val partitionStartIndices =
- if (mapOutputStatistics.length == 0) {
- Array.empty[Int]
- } else {
- estimatePartitionStartIndices(mapOutputStatistics)
- }
-
- var k = 0
- while (k < numExchanges) {
- val exchange = exchanges(k)
- val rdd =
- exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices))
- newPostShuffleRDDs.put(exchange, rdd)
-
- k += 1
- }
-
- // Finally, we set postShuffleRDDs and estimated.
- assert(postShuffleRDDs.isEmpty)
- assert(newPostShuffleRDDs.size() == numExchanges)
- postShuffleRDDs.putAll(newPostShuffleRDDs)
- estimated = true
- }
- }
-
- def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
- doEstimationIfNecessary()
-
- if (!postShuffleRDDs.containsKey(exchange)) {
- throw new IllegalStateException(
- s"The given $exchange is not registered in this coordinator.")
- }
-
- postShuffleRDDs.get(exchange)
- }
-
- override def toString: String = {
- s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 31f75e3..5d0208f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -39,12 +39,11 @@ import org.apache.spark.util.MutablePair
import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
/**
- * Performs a shuffle that will result in the desired `newPartitioning`.
+ * Performs a shuffle that will result in the desired partitioning.
*/
case class ShuffleExchangeExec(
- var newPartitioning: Partitioning,
- child: SparkPlan,
- @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
+ override val outputPartitioning: Partitioning,
+ child: SparkPlan) extends Exchange {
// NOTE: coordinator can be null after serialization/deserialization,
// e.g. it can be null on the Executor side
@@ -56,37 +55,11 @@ case class ShuffleExchangeExec(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ readMetrics ++ writeMetrics
- override def nodeName: String = {
- val extraInfo = coordinator match {
- case Some(exchangeCoordinator) =>
- s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
- case _ => ""
- }
-
- val simpleNodeName = "Exchange"
- s"$simpleNodeName$extraInfo"
- }
-
- override def outputPartitioning: Partitioning = newPartitioning
+ override def nodeName: String = "Exchange"
private val serializer: Serializer =
new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
- override protected def doPrepare(): Unit = {
- // If an ExchangeCoordinator is needed, we register this Exchange operator
- // to the coordinator when we do prepare. It is important to make sure
- // we register this operator right before the execution instead of register it
- // in the constructor because it is possible that we create new instances of
- // Exchange operators when we transform the physical plan
- // (then the ExchangeCoordinator will hold references of unneeded Exchanges).
- // So, we should only call registerExchange just before we start to execute
- // the plan.
- coordinator match {
- case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
- case _ =>
- }
- }
-
@transient lazy val inputRDD: RDD[InternalRow] = child.execute()
/**
@@ -99,28 +72,13 @@ case class ShuffleExchangeExec(
ShuffleExchangeExec.prepareShuffleDependency(
inputRDD,
child.output,
- newPartitioning,
+ outputPartitioning,
serializer,
writeMetrics)
}
- /**
- * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
- * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional
- * partition start indices array. If this optional array is defined, the returned
- * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
- */
- private[exchange] def preparePostShuffleRDD(
- shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
- specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
- // If an array of partition start indices is provided, we need to use this array
- // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
- // update the number of post-shuffle partitions.
- specifiedPartitionStartIndices.foreach { indices =>
- assert(newPartitioning.isInstanceOf[HashPartitioning])
- newPartitioning = UnknownPartitioning(indices.length)
- }
- new ShuffledRowRDD(shuffleDependency, readMetrics, specifiedPartitionStartIndices)
+ def createShuffledRDD(partitionStartIndices: Option[Array[Int]]): ShuffledRowRDD = {
+ new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices)
}
/**
@@ -131,23 +89,13 @@ case class ShuffleExchangeExec(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
- cachedShuffleRDD = coordinator match {
- case Some(exchangeCoordinator) =>
- val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
- assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
- shuffleRDD
- case _ =>
- preparePostShuffleRDD(shuffleDependency)
- }
+ cachedShuffleRDD = createShuffledRDD(None)
}
cachedShuffleRDD
}
}
object ShuffleExchangeExec {
- def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
- ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
- }
/**
* Determines whether records must be defensively copied before being sent to the shuffle.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 7c1f6ca..eb95719 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -320,7 +320,7 @@ abstract class StreamExecution(
logicalPlan
// Adaptive execution can change num shuffle partitions, disallow
- sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+ sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false")
// Disable cost-based join optimization as we do not want stateful operations to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
offsetSeqMetadata = OffsetSeqMetadata(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 1705d56..0b472c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
- if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
- logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
+ if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
+ logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 4b08a4b..efd5db1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1365,7 +1365,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val agg = cp.groupBy('id % 2).agg(count('id))
agg.queryExecution.executedPlan.collectFirst {
- case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
+ case ShuffleExchangeExec(_, _: RDDScanExec) =>
case BroadcastExchangeExec(_, _: RDDScanExec) =>
}.foreach { _ =>
fail(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index faa7cbb..c2d9e54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -413,8 +413,7 @@ class PlannerSuite extends SharedSQLContext {
val inputPlan = ShuffleExchangeExec(
partitioning,
- DummySparkPlan(outputPartitioning = partitioning),
- None)
+ DummySparkPlan(outputPartitioning = partitioning))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) {
@@ -429,8 +428,7 @@ class PlannerSuite extends SharedSQLContext {
val inputPlan = ShuffleExchangeExec(
partitioning,
- DummySparkPlan(outputPartitioning = partitioning),
- None)
+ DummySparkPlan(outputPartitioning = partitioning))
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) {
@@ -452,7 +450,7 @@ class PlannerSuite extends SharedSQLContext {
val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
val shuffle = outputPlan.collect { case e: ShuffleExchangeExec => e }
assert(shuffle.size === 1)
- assert(shuffle.head.newPartitioning === finalPartitioning)
+ assert(shuffle.head.outputPartitioning === finalPartitioning)
}
test("Reuse exchanges") {
@@ -464,8 +462,7 @@ class PlannerSuite extends SharedSQLContext {
DummySparkPlan(
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
- requiredChildOrdering = Seq(Seq.empty)),
- None)
+ requiredChildOrdering = Seq(Seq.empty)))
val inputPlan = SortMergeJoinExec(
Literal(1) :: Nil,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
similarity index 63%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
index 74f33f6..fea77c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
@@ -22,11 +22,12 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._
-import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.adaptive.rule.{CoalescedShuffleReaderExec, ReduceNumShufflePartitions}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
-class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
+class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll {
private var originalActiveSparkSession: Option[SparkSession] = _
private var originalInstantiatedSparkSession: Option[SparkSession] = _
@@ -51,7 +52,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
}
private def checkEstimation(
- coordinator: ExchangeCoordinator,
+ rule: ReduceNumShufflePartitions,
bytesByPartitionIdArray: Array[Array[Long]],
expectedPartitionStartIndices: Array[Int]): Unit = {
val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
@@ -59,18 +60,27 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
new MapOutputStatistics(index, bytesByPartitionId)
}
val estimatedPartitionStartIndices =
- coordinator.estimatePartitionStartIndices(mapOutputStatistics)
+ rule.estimatePartitionStartIndices(mapOutputStatistics)
assert(estimatedPartitionStartIndices === expectedPartitionStartIndices)
}
+ private def createReduceNumShufflePartitionsRule(
+ advisoryTargetPostShuffleInputSize: Long,
+ minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = {
+ val conf = new SQLConf().copy(
+ SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize,
+ SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions)
+ ReduceNumShufflePartitions(conf)
+ }
+
test("test estimatePartitionStartIndices - 1 Exchange") {
- val coordinator = new ExchangeCoordinator(100L)
+ val rule = createReduceNumShufflePartitionsRule(100L)
{
// All bytes per partition are 0.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
{
@@ -78,40 +88,40 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// 1 post-shuffle partition is needed.
val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
{
// 2 post-shuffle partitions are needed.
val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
val expectedPartitionStartIndices = Array[Int](0, 3)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
{
// There are a few large pre-shuffle partitions.
val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
{
// All pre-shuffle partitions are larger than the targeted size.
val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
{
// The last pre-shuffle partition is in a single post-shuffle partition.
val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
val expectedPartitionStartIndices = Array[Int](0, 4)
- checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+ checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
}
}
test("test estimatePartitionStartIndices - 2 Exchanges") {
- val coordinator = new ExchangeCoordinator(100L)
+ val rule = createReduceNumShufflePartitionsRule(100L)
{
// If there are multiple values of the number of pre-shuffle partitions,
@@ -122,7 +132,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
Array(
new MapOutputStatistics(0, bytesByPartitionId1),
new MapOutputStatistics(1, bytesByPartitionId2))
- intercept[AssertionError](coordinator.estimatePartitionStartIndices(mapOutputStatistics))
+ intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
}
{
@@ -131,7 +141,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -143,7 +153,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -154,7 +164,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 2, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -165,7 +175,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -176,7 +186,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -187,7 +197,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -198,14 +208,14 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
}
test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
- val coordinator = new ExchangeCoordinator(100L, Some(2))
+ val rule = createReduceNumShufflePartitionsRule(100L, 2)
{
// The minimal number of post-shuffle partitions is not enforced because
@@ -214,7 +224,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionStartIndices = Array[Int](0)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -225,7 +235,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
val expectedPartitionStartIndices = Array[Int](0, 3)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -236,7 +246,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
checkEstimation(
- coordinator,
+ rule,
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionStartIndices)
}
@@ -257,24 +267,24 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
def withSparkSession(
f: SparkSession => Unit,
- targetNumPostShufflePartitions: Int,
+ targetPostShuffleInputSize: Int,
minNumPostShufflePartitions: Option[Int]): Unit = {
val sparkConf =
new SparkConf(false)
.setMaster("local[*]")
.setAppName("test")
.set(UI_ENABLED, false)
- .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
- .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+ .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
+ .set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set(
SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
- targetNumPostShufflePartitions.toString)
+ targetPostShuffleInputSize.toString)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString)
case None =>
- sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1")
+ sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
}
val spark = SparkSession.builder()
@@ -304,25 +314,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// Then, let's look at the number of post-shuffle partitions estimated
// by the ExchangeCoordinator.
- val exchanges = agg.queryExecution.executedPlan.collect {
- case e: ShuffleExchangeExec => e
+ val finalPlan = agg.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val shuffleReaders = finalPlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
}
- assert(exchanges.length === 1)
+ assert(shuffleReaders.length === 1)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 5)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === numPartitions)
}
case None =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 3)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === 3)
}
}
}
@@ -355,25 +361,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// Then, let's look at the number of post-shuffle partitions estimated
// by the ExchangeCoordinator.
- val exchanges = join.queryExecution.executedPlan.collect {
- case e: ShuffleExchangeExec => e
+ val finalPlan = join.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val shuffleReaders = finalPlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
}
- assert(exchanges.length === 2)
+ assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 5)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === numPartitions)
}
case None =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 2)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === 2)
}
}
}
@@ -411,26 +413,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// Then, let's look at the number of post-shuffle partitions estimated
// by the ExchangeCoordinator.
- val exchanges = join.queryExecution.executedPlan.collect {
- case e: ShuffleExchangeExec => e
+ val finalPlan = join.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val shuffleReaders = finalPlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
}
- assert(exchanges.length === 4)
+ assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 5)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === numPartitions)
}
case None =>
- assert(exchanges.forall(_.coordinator.isDefined))
- assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3))
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === 2)
+ }
}
}
- withSparkSession(test, 6644, minNumPostShufflePartitions)
+ withSparkSession(test, 16384, minNumPostShufflePartitions)
}
test(s"determining the number of reducers: complex query 2$testNameNote") {
@@ -463,39 +465,131 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
// Then, let's look at the number of post-shuffle partitions estimated
// by the ExchangeCoordinator.
- val exchanges = join.queryExecution.executedPlan.collect {
- case e: ShuffleExchangeExec => e
+ val finalPlan = join.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val shuffleReaders = finalPlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
}
- assert(exchanges.length === 3)
+ assert(shuffleReaders.length === 2)
minNumPostShufflePartitions match {
case Some(numPartitions) =>
- exchanges.foreach {
- case e: ShuffleExchangeExec =>
- assert(e.coordinator.isDefined)
- assert(e.outputPartitioning.numPartitions === 5)
- case o =>
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === numPartitions)
}
case None =>
- assert(exchanges.forall(_.coordinator.isDefined))
- assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3))
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === 3)
+ }
}
}
- withSparkSession(test, 6144, minNumPostShufflePartitions)
+ withSparkSession(test, 12000, minNumPostShufflePartitions)
+ }
+
+ test(s"determining the number of reducers: plan already partitioned$testNameNote") {
+ val test: SparkSession => Unit = { spark: SparkSession =>
+ try {
+ spark.range(1000).write.bucketBy(30, "id").saveAsTable("t")
+ // `df1` is hash partitioned by `id`.
+ val df1 = spark.read.table("t")
+ val df2 =
+ spark
+ .range(0, 1000, 1, numInputPartitions)
+ .selectExpr("id % 500 as key2", "id as value2")
+
+ val join = df1.join(df2, col("id") === col("key2")).select(col("id"), col("value2"))
+
+ // Check the answer first.
+ val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id as value")
+ .union(spark.range(500, 1000).selectExpr("id % 500", "id as value"))
+ checkAnswer(
+ join,
+ expectedAnswer.collect())
+
+ // Then, let's make sure we do not reduce number of ppst shuffle partitions.
+ val finalPlan = join.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ val shuffleReaders = finalPlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
+ }
+ assert(shuffleReaders.length === 0)
+ } finally {
+ spark.sql("drop table t")
+ }
+ }
+ withSparkSession(test, 12000, minNumPostShufflePartitions)
}
}
test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
- val test = { spark: SparkSession =>
+ val test: SparkSession => Unit = { spark: SparkSession =>
spark.sql("SET spark.sql.exchange.reuse=true")
val df = spark.range(1).selectExpr("id AS key", "id AS value")
+
+ // test case 1: a query stage has 3 child stages but they are the same stage.
+ // Final Stage 1
+ // ShuffleQueryStage 0
+ // ReusedQueryStage 0
+ // ReusedQueryStage 0
val resultDf = df.join(df, "key").join(df, "key")
- val sparkPlan = resultDf.queryExecution.executedPlan
- assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1)
- assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3)
checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
+ val finalPlan = resultDf.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2)
+ assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3)
+
+
+ // test case 2: a query stage has 2 parent stages.
+ // Final Stage 3
+ // ShuffleQueryStage 1
+ // ShuffleQueryStage 0
+ // ShuffleQueryStage 2
+ // ReusedQueryStage 0
+ val grouped = df.groupBy("key").agg(max("value").as("value"))
+ val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
+ .union(grouped.groupBy(col("key") + 2).max("value"))
+ checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
+
+ val finalPlan2 = resultDf2.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+
+ // The result stage has 2 children
+ val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
+ assert(level1Stages.length == 2)
+
+ val leafStages = level1Stages.flatMap { stage =>
+ // All of the child stages of result stage have only one child stage.
+ val children = stage.plan.collect { case q: QueryStageExec => q }
+ assert(children.length == 1)
+ children
+ }
+ assert(leafStages.length == 2)
+
+ val reusedStages = level1Stages.flatMap { stage =>
+ stage.plan.collect { case r: ReusedQueryStageExec => r }
+ }
+ assert(reusedStages.length == 1)
}
withSparkSession(test, 4, None)
}
+
+ test("Union two datasets with different pre-shuffle partition number") {
+ val test: SparkSession => Unit = { spark: SparkSession =>
+ val dataset1 = spark.range(3)
+ val dataset2 = spark.range(3)
+
+ val resultDf = dataset1.repartition(2, dataset1.col("id"))
+ .union(dataset2.repartition(3, dataset2.col("id"))).toDF()
+
+ checkAnswer(resultDf,
+ Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i)))
+ val finalPlan = resultDf.queryExecution.executedPlan
+ .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+ // As the pre-shuffle partition number are different, we will skip reducing
+ // the shuffle partition numbers.
+ assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 0)
+ }
+ withSparkSession(test, 100, None)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 2cddf7c..0a0973a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf
@@ -92,6 +93,30 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
}
}
+ test("Change merge join to broadcast join and reduce number of shuffle partitions") {
+ withSQLConf(
+ SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
+ SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
+ SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") {
+ val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM testData join testData2 ON key = a where value = '1'")
+ val smj = findTopLevelSortMergeJoin(plan)
+ assert(smj.size == 1)
+ val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+ assert(bhj.size == 1)
+
+ val shuffleReaders = adaptivePlan.collect {
+ case reader: CoalescedShuffleReaderExec => reader
+ }
+ assert(shuffleReaders.length === 1)
+ // The pre-shuffle partition size is [0, 72, 0, 72, 126]
+ shuffleReaders.foreach { reader =>
+ assert(reader.outputPartitioning.numPartitions === 2)
+ }
+ }
+ }
+
test("Scalar subquery") {
withSQLConf(
SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org