You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/07/04 08:03:29 UTC

[spark] branch master updated: [SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cec6a32  [SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution
cec6a32 is described below

commit cec6a329044fa7d8a4f3da3871dbacac95cc38ed
Author: Carson Wang <ca...@intel.com>
AuthorDate: Thu Jul 4 16:03:04 2019 +0800

    [SPARK-28177][SQL] Adjust post shuffle partition number in adaptive execution
    
    ## What changes were proposed in this pull request?
    This is to implement a ReduceNumShufflePartitions rule in the new adaptive execution framework introduced in #24706. This rule is used to adjust the post shuffle partitions based on the map output statistics.
    
    ## How was this patch tested?
    Added ReduceNumShufflePartitionsSuite
    
    Closes #24978 from carsonwang/reduceNumShufflePartitions.
    
    Authored-by: Carson Wang <ca...@intel.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  33 ++-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |   2 +
 .../sql/execution/adaptive/QueryStageExec.scala    |  14 +-
 .../adaptive/ReduceNumShufflePartitions.scala      | 200 +++++++++++++++
 .../execution/exchange/EnsureRequirements.scala    | 120 +--------
 .../execution/exchange/ExchangeCoordinator.scala   | 277 ---------------------
 .../execution/exchange/ShuffleExchangeExec.scala   |  68 +----
 .../sql/execution/streaming/StreamExecution.scala  |   2 +-
 .../sql/streaming/StreamingQueryManager.scala      |   4 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |   2 +-
 .../apache/spark/sql/execution/PlannerSuite.scala  |  11 +-
 ...scala => ReduceNumShufflePartitionsSuite.scala} | 256 +++++++++++++------
 .../adaptive/AdaptiveQueryExecSuite.scala          |  25 ++
 13 files changed, 465 insertions(+), 549 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e4c58ef..af67632 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -311,16 +311,30 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
+    buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
+    .doc("When true and adaptive execution is enabled, this enables reducing the number of " +
+      "post-shuffle partitions based on map output statistics.")
+    .booleanConf
+    .createWithDefault(true)
+
   val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
     buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
-      .internal()
-      .doc("The advisory minimal number of post-shuffle partitions provided to " +
-        "ExchangeCoordinator. This setting is used in our test to make sure we " +
-        "have enough parallelism to expose issues that will not be exposed with a " +
-        "single partition. When the value is a non-positive value, this setting will " +
-        "not be provided to ExchangeCoordinator.")
+      .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
       .intConf
-      .createWithDefault(-1)
+      .checkValue(_ > 0, "The minimum shuffle partition number " +
+        "must be a positive integer.")
+      .createWithDefault(1)
+
+  val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
+    buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
+      .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
+        "This is used as the initial number of pre-shuffle partitions. By default it equals to " +
+        "spark.sql.shuffle.partitions")
+      .intConf
+      .checkValue(_ > 0, "The maximum shuffle partition number " +
+        "must be a positive integer.")
+      .createOptional
 
   val SUBEXPRESSION_ELIMINATION_ENABLED =
     buildConf("spark.sql.subexpressionElimination.enabled")
@@ -1939,9 +1953,14 @@ class SQLConf extends Serializable with Logging {
   def adaptiveExecutionEnabled: Boolean =
     getConf(ADAPTIVE_EXECUTION_ENABLED) && !getConf(RUNTIME_REOPTIMIZATION_ENABLED)
 
+  def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+
   def minNumPostShufflePartitions: Int =
     getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
 
+  def maxNumPostShufflePartitions: Int =
+    getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)
+
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
   def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 606fbd8..0708878 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
+import org.apache.spark.sql.execution.adaptive.rule.ReduceNumShufflePartitions
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.internal.SQLConf
@@ -82,6 +83,7 @@ case class AdaptiveSparkPlanExec(
   // A list of physical optimizer rules to be applied to a new stage before its execution. These
   // optimizations should be stage-independent.
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+    ReduceNumShufflePartitions(conf),
     CollapseCodegenStages(conf)
   )
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 98cb7d0..c803ca3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -161,9 +161,21 @@ case class BroadcastQueryStageExec(
   }
 }
 
+object ShuffleQueryStageExec {
+  /**
+   * Returns true if the plan is a [[ShuffleQueryStageExec]] or a reused [[ShuffleQueryStageExec]].
+   */
+  def isShuffleQueryStageExec(plan: SparkPlan): Boolean = plan match {
+    case r: ReusedQueryStageExec => isShuffleQueryStageExec(r.plan)
+    case _: ShuffleQueryStageExec => true
+    case _ => false
+  }
+}
+
 object BroadcastQueryStageExec {
   /**
-   * Returns if the plan is a [[BroadcastQueryStageExec]] or a reused [[BroadcastQueryStageExec]].
+   * Returns true if the plan is a [[BroadcastQueryStageExec]] or a reused
+   * [[BroadcastQueryStageExec]].
    */
   def isBroadcastQueryStageExec(plan: SparkPlan): Boolean = plan match {
     case r: ReusedQueryStageExec => isBroadcastQueryStageExec(r.plan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
new file mode 100644
index 0000000..d93eb76
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive.rule
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ReusedQueryStageExec, ShuffleQueryStageExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A rule to adjust the post shuffle partitions based on the map output statistics.
+ *
+ * The strategy used to determine the number of post-shuffle partitions is described as follows.
+ * To determine the number of post-shuffle partitions, we have a target input size for a
+ * post-shuffle partition. Once we have size statistics of all pre-shuffle partitions, we will do
+ * a pass of those statistics and pack pre-shuffle partitions with continuous indices to a single
+ * post-shuffle partition until adding another pre-shuffle partition would cause the size of a
+ * post-shuffle partition to be greater than the target size.
+ *
+ * For example, we have two stages with the following pre-shuffle partition size statistics:
+ * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
+ * stage 2: [10 MiB,  10 MiB, 70 MiB,  5 MiB, 5 MiB]
+ * assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
+ * which are:
+ *  - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
+ *  - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
+ *  - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
+ *  - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
+ */
+case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] {
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    if (!conf.reducePostShufflePartitionsEnabled) {
+      return plan
+    }
+    if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) {
+      // If not all leaf nodes are query stages, it's not safe to reduce the number of
+      // shuffle partitions, because we may break the assumption that all children of a spark plan
+      // have same number of output partitions.
+      plan
+    } else {
+      val shuffleStages = plan.collect {
+        case stage: ShuffleQueryStageExec => stage
+        case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => stage
+      }
+      val shuffleMetrics = shuffleStages.map { stage =>
+        val metricsFuture = stage.mapOutputStatisticsFuture
+        assert(metricsFuture.isCompleted, "ShuffleQueryStageExec should already be ready")
+        ThreadUtils.awaitResult(metricsFuture, Duration.Zero)
+      }
+
+      // `ShuffleQueryStageExec` gives null mapOutputStatistics when the input RDD has 0 partitions,
+      // we should skip it when calculating the `partitionStartIndices`.
+      val validMetrics = shuffleMetrics.filter(_ != null)
+      // We may get different pre-shuffle partition number if user calls repartition manually.
+      // We don't reduce shuffle partition number in that case.
+      val distinctNumPreShufflePartitions =
+        validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
+
+      if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) {
+        val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray)
+        // This transformation adds new nodes, so we must use `transformUp` here.
+        plan.transformUp {
+          // even for shuffle exchange whose input RDD has 0 partition, we should still update its
+          // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
+          // number of output partitions.
+          case stage: QueryStageExec if ShuffleQueryStageExec.isShuffleQueryStageExec(stage) =>
+            CoalescedShuffleReaderExec(stage, partitionStartIndices)
+        }
+      } else {
+        plan
+      }
+    }
+  }
+
+  /**
+   * Estimates partition start indices for post-shuffle partitions based on
+   * mapOutputStatistics provided by all pre-shuffle stages.
+   */
+  // visible for testing.
+  private[sql] def estimatePartitionStartIndices(
+      mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
+    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
+    val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize
+    // If minNumPostShufflePartitions is defined, it is possible that we need to use a
+    // value less than advisoryTargetPostShuffleInputSize as the target input size of
+    // a post shuffle task.
+    val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
+    // The max at here is to make sure that when we have an empty table, we
+    // only have a single post-shuffle partition.
+    // There is no particular reason that we pick 16. We just need a number to
+    // prevent maxPostShuffleInputSize from being set to 0.
+    val maxPostShuffleInputSize = math.max(
+      math.ceil(totalPostShuffleInputSize / minNumPostShufflePartitions.toDouble).toLong, 16)
+    val targetPostShuffleInputSize =
+      math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
+
+    logInfo(
+      s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
+        s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
+
+    // Make sure we do get the same number of pre-shuffle partitions for those stages.
+    val distinctNumPreShufflePartitions =
+      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
+    // The reason that we are expecting a single value of the number of pre-shuffle partitions
+    // is that when we add Exchanges, we set the number of pre-shuffle partitions
+    // (i.e. map output partitions) using a static setting, which is the value of
+    // spark.sql.shuffle.partitions. Even if two input RDDs are having different
+    // number of partitions, they will have the same number of pre-shuffle partitions
+    // (i.e. map output partitions).
+    assert(
+      distinctNumPreShufflePartitions.length == 1,
+      "There should be only one distinct value of the number pre-shuffle partitions " +
+        "among registered Exchange operator.")
+    val numPreShufflePartitions = distinctNumPreShufflePartitions.head
+
+    val partitionStartIndices = ArrayBuffer[Int]()
+    // The first element of partitionStartIndices is always 0.
+    partitionStartIndices += 0
+
+    var postShuffleInputSize = 0L
+
+    var i = 0
+    while (i < numPreShufflePartitions) {
+      // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
+      // Then, we add the total size to postShuffleInputSize.
+      var nextShuffleInputSize = 0L
+      var j = 0
+      while (j < mapOutputStatistics.length) {
+        nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
+        j += 1
+      }
+
+      // If including the nextShuffleInputSize would exceed the target partition size, then start a
+      // new partition.
+      if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
+        partitionStartIndices += i
+        // reset postShuffleInputSize.
+        postShuffleInputSize = nextShuffleInputSize
+      } else {
+        postShuffleInputSize += nextShuffleInputSize
+      }
+
+      i += 1
+    }
+
+    partitionStartIndices.toArray
+  }
+}
+
+case class CoalescedShuffleReaderExec(
+    child: QueryStageExec,
+    partitionStartIndices: Array[Int]) extends UnaryExecNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def outputPartitioning: Partitioning = {
+    UnknownPartitioning(partitionStartIndices.length)
+  }
+
+  private var cachedShuffleRDD: ShuffledRowRDD = null
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    if (cachedShuffleRDD == null) {
+      cachedShuffleRDD = child match {
+        case stage: ShuffleQueryStageExec =>
+          stage.plan.createShuffledRDD(Some(partitionStartIndices))
+        case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) =>
+          stage.plan.createShuffledRDD(Some(partitionStartIndices))
+      }
+    }
+    cachedShuffleRDD
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index d2d5011..b7196d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -36,107 +36,12 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchangeExec]]s support [[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may still need to
-        // shuffle data when we have more than one children because data generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution = requiredChildDistributions.forall { dist =>
-          dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
-        }
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchangeExec, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange for now.
-            // Ideally, we can try to avoid this Exchange. However, when we reach here,
-            // there are at least two children operators (because if there is a single child
-            // and we can avoid Exchange, supportsCoordinator will be false and we
-            // will not reach here.). Although we can make two children have the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator of the Join to
-            // have a number of post-shuffle partitions that matches the number of partitions of
-            // Agg, we cannot say these two children are partitioned in the same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices [0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent the post-shuffle
-            // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
-            val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are returned.
-        children
-      }
-
-    withCoordinator
-  }
+  private def defaultNumPreShufflePartitions: Int =
+    if (conf.runtimeReoptimizationEnabled) {
+      conf.maxNumPostShufflePartitions
+    } else {
+      conf.numShufflePartitions
+    }
 
   private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
     val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
@@ -189,7 +94,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
             val defaultPartitioning = distribution.createPartitioning(targetNumPartitions)
             child match {
               // If child is an exchange, we replace it with a new one having defaultPartitioning.
-              case ShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c)
+              case ShuffleExchangeExec(_, c) => ShuffleExchangeExec(defaultPartitioning, c)
               case _ => ShuffleExchangeExec(defaultPartitioning, child)
             }
           }
@@ -198,15 +103,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
       }
     }
 
-    // Now, we need to add ExchangeCoordinator if necessary.
-    // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
-    // However, with the way that we plan the query, we do not have a place where we have a
-    // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
-    // at here for now.
-    // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
-    // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
-    children = withExchangeCoordinator(children, requiredChildDistributions)
-
     // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
     children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
       // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
@@ -295,7 +191,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     // TODO: remove this after we create a physical operator for `RepartitionByExpression`.
-    case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) =>
+    case operator @ ShuffleExchangeExec(upper: HashPartitioning, child) =>
       child.outputPartitioning match {
         case lower: HashPartitioning if upper.semanticEquals(lower) => child
         case _ => operator
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
deleted file mode 100644
index c99bf45..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.exchange
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{MapOutputStatistics, ShuffleDependency, SimpleFutureAction}
-import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
-
-/**
- * A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
- * Right now, the work of this coordinator is to determine the number of post-shuffle partitions
- * for a stage that needs to fetch shuffle data from one or multiple stages.
- *
- * A coordinator is constructed with three parameters, `numExchanges`,
- * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
- *  - `numExchanges` is used to indicated that how many [[ShuffleExchangeExec]]s that will be
- *    registered to this coordinator. So, when we start to do any actual work, we have a way to
- *    make sure that we have got expected number of [[ShuffleExchangeExec]]s.
- *  - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
- *    input data size. With this parameter, we can estimate the number of post-shuffle partitions.
- *    This parameter is configured through
- *    `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
- *  - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
- *    will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
- *    partitions.
- *
- * The workflow of this coordinator is described as follows:
- *  - Before the execution of a [[SparkPlan]], for a [[ShuffleExchangeExec]] operator,
- *    if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
- *    This happens in the `doPrepare` method.
- *  - Once we start to execute a physical plan, a [[ShuffleExchangeExec]] registered to this
- *    coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
- *    [[ShuffledRowRDD]].
- *    If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchangeExec]]
- *    will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
- *  - If this coordinator has not made the decision on how to shuffle data, it will ask those
- *    registered [[ShuffleExchangeExec]]s to submit their pre-shuffle stages. Then, based on the
- *    size statistics of pre-shuffle partitions, this coordinator will determine the number of
- *    post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
- *    to a single post-shuffle partition whenever necessary.
- *  - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
- *    [[ShuffleExchangeExec]]s. So, when a [[ShuffleExchangeExec]] calls `postShuffleRDD`, this
- *    coordinator can lookup the corresponding [[RDD]].
- *
- * The strategy used to determine the number of post-shuffle partitions is described as follows.
- * To determine the number of post-shuffle partitions, we have a target input size for a
- * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
- * corresponding to the registered [[ShuffleExchangeExec]]s, we will do a pass of those statistics
- * and pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
- * adding another pre-shuffle partition would cause the size of a post-shuffle partition to be
- * greater than the target size.
- *
- * For example, we have two stages with the following pre-shuffle partition size statistics:
- * stage 1: [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
- * stage 2: [10 MiB,  10 MiB, 70 MiB,  5 MiB, 5 MiB]
- * assuming the target input size is 128 MiB, we will have four post-shuffle partitions,
- * which are:
- *  - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MiB)
- *  - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MiB)
- *  - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MiB)
- *  - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MiB)
- */
-class ExchangeCoordinator(
-    advisoryTargetPostShuffleInputSize: Long,
-    minNumPostShufflePartitions: Option[Int] = None)
-  extends Logging {
-
-  // The registered Exchange operators.
-  private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
-
-  // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the
-  // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is
-  // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails
-  // in `doEstimationIfNecessary`.
-  private[this] lazy val numExchanges = exchanges.size
-
-  // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
-  private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] =
-    new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
-
-  // A boolean that indicates if this coordinator has made decision on how to shuffle data.
-  // This variable will only be updated by doEstimationIfNecessary, which is protected by
-  // synchronized.
-  @volatile private[this] var estimated: Boolean = false
-
-  /**
-   * Registers a [[ShuffleExchangeExec]] operator to this coordinator. This method is only allowed
-   * to be called in the `doPrepare` method of a [[ShuffleExchangeExec]] operator.
-   */
-  @GuardedBy("this")
-  def registerExchange(exchange: ShuffleExchangeExec): Unit = synchronized {
-    exchanges += exchange
-  }
-
-  def isEstimated: Boolean = estimated
-
-  /**
-   * Estimates partition start indices for post-shuffle partitions based on
-   * mapOutputStatistics provided by all pre-shuffle stages.
-   */
-  def estimatePartitionStartIndices(
-      mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
-    // If minNumPostShufflePartitions is defined, it is possible that we need to use a
-    // value less than advisoryTargetPostShuffleInputSize as the target input size of
-    // a post shuffle task.
-    val targetPostShuffleInputSize = minNumPostShufflePartitions match {
-      case Some(numPartitions) =>
-        val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
-        // The max at here is to make sure that when we have an empty table, we
-        // only have a single post-shuffle partition.
-        // There is no particular reason that we pick 16. We just need a number to
-        // prevent maxPostShuffleInputSize from being set to 0.
-        val maxPostShuffleInputSize =
-          math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
-        math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
-
-      case None => advisoryTargetPostShuffleInputSize
-    }
-
-    logInfo(
-      s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
-      s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
-
-    // Make sure we do get the same number of pre-shuffle partitions for those stages.
-    val distinctNumPreShufflePartitions =
-      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
-    // The reason that we are expecting a single value of the number of pre-shuffle partitions
-    // is that when we add Exchanges, we set the number of pre-shuffle partitions
-    // (i.e. map output partitions) using a static setting, which is the value of
-    // spark.sql.shuffle.partitions. Even if two input RDDs are having different
-    // number of partitions, they will have the same number of pre-shuffle partitions
-    // (i.e. map output partitions).
-    assert(
-      distinctNumPreShufflePartitions.length == 1,
-      "There should be only one distinct value of the number pre-shuffle partitions " +
-        "among registered Exchange operator.")
-    val numPreShufflePartitions = distinctNumPreShufflePartitions.head
-
-    val partitionStartIndices = ArrayBuffer[Int]()
-    // The first element of partitionStartIndices is always 0.
-    partitionStartIndices += 0
-
-    var postShuffleInputSize = 0L
-
-    var i = 0
-    while (i < numPreShufflePartitions) {
-      // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
-      // Then, we add the total size to postShuffleInputSize.
-      var nextShuffleInputSize = 0L
-      var j = 0
-      while (j < mapOutputStatistics.length) {
-        nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
-        j += 1
-      }
-
-      // If including the nextShuffleInputSize would exceed the target partition size, then start a
-      // new partition.
-      if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) {
-        partitionStartIndices += i
-        // reset postShuffleInputSize.
-        postShuffleInputSize = nextShuffleInputSize
-      } else postShuffleInputSize += nextShuffleInputSize
-
-      i += 1
-    }
-
-    partitionStartIndices.toArray
-  }
-
-  @GuardedBy("this")
-  private def doEstimationIfNecessary(): Unit = synchronized {
-    // It is unlikely that this method will be called from multiple threads
-    // (when multiple threads trigger the execution of THIS physical)
-    // because in common use cases, we will create new physical plan after
-    // users apply operations (e.g. projection) to an existing DataFrame.
-    // However, if it happens, we have synchronized to make sure only one
-    // thread will trigger the job submission.
-    if (!estimated) {
-      // Make sure we have the expected number of registered Exchange operators.
-      assert(exchanges.length == numExchanges)
-
-      val newPostShuffleRDDs = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
-
-      // Submit all map stages
-      val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
-      val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]()
-      var i = 0
-      while (i < numExchanges) {
-        val exchange = exchanges(i)
-        val shuffleDependency = exchange.shuffleDependency
-        shuffleDependencies += shuffleDependency
-        if (shuffleDependency.rdd.partitions.length != 0) {
-          // submitMapStage does not accept RDD with 0 partition.
-          // So, we will not submit this dependency.
-          submittedStageFutures +=
-            exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency)
-        }
-        i += 1
-      }
-
-      // Wait for the finishes of those submitted map stages.
-      val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length)
-      var j = 0
-      while (j < submittedStageFutures.length) {
-        // This call is a blocking call. If the stage has not finished, we will wait at here.
-        mapOutputStatistics(j) = submittedStageFutures(j).get()
-        j += 1
-      }
-
-      // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
-      // a stage when the number of partitions of this dependency is 0.
-      assert(mapOutputStatistics.length <= numExchanges)
-
-      // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
-      // number of post-shuffle partitions.
-      val partitionStartIndices =
-        if (mapOutputStatistics.length == 0) {
-          Array.empty[Int]
-        } else {
-          estimatePartitionStartIndices(mapOutputStatistics)
-        }
-
-      var k = 0
-      while (k < numExchanges) {
-        val exchange = exchanges(k)
-        val rdd =
-          exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices))
-        newPostShuffleRDDs.put(exchange, rdd)
-
-        k += 1
-      }
-
-      // Finally, we set postShuffleRDDs and estimated.
-      assert(postShuffleRDDs.isEmpty)
-      assert(newPostShuffleRDDs.size() == numExchanges)
-      postShuffleRDDs.putAll(newPostShuffleRDDs)
-      estimated = true
-    }
-  }
-
-  def postShuffleRDD(exchange: ShuffleExchangeExec): ShuffledRowRDD = {
-    doEstimationIfNecessary()
-
-    if (!postShuffleRDDs.containsKey(exchange)) {
-      throw new IllegalStateException(
-        s"The given $exchange is not registered in this coordinator.")
-    }
-
-    postShuffleRDDs.get(exchange)
-  }
-
-  override def toString: String = {
-    s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 31f75e3..5d0208f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -39,12 +39,11 @@ import org.apache.spark.util.MutablePair
 import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator}
 
 /**
- * Performs a shuffle that will result in the desired `newPartitioning`.
+ * Performs a shuffle that will result in the desired partitioning.
  */
 case class ShuffleExchangeExec(
-    var newPartitioning: Partitioning,
-    child: SparkPlan,
-    @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
+    override val outputPartitioning: Partitioning,
+    child: SparkPlan) extends Exchange {
 
   // NOTE: coordinator can be null after serialization/deserialization,
   //       e.g. it can be null on the Executor side
@@ -56,37 +55,11 @@ case class ShuffleExchangeExec(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
   ) ++ readMetrics ++ writeMetrics
 
-  override def nodeName: String = {
-    val extraInfo = coordinator match {
-      case Some(exchangeCoordinator) =>
-        s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})"
-      case _ => ""
-    }
-
-    val simpleNodeName = "Exchange"
-    s"$simpleNodeName$extraInfo"
-  }
-
-  override def outputPartitioning: Partitioning = newPartitioning
+  override def nodeName: String = "Exchange"
 
   private val serializer: Serializer =
     new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
 
-  override protected def doPrepare(): Unit = {
-    // If an ExchangeCoordinator is needed, we register this Exchange operator
-    // to the coordinator when we do prepare. It is important to make sure
-    // we register this operator right before the execution instead of register it
-    // in the constructor because it is possible that we create new instances of
-    // Exchange operators when we transform the physical plan
-    // (then the ExchangeCoordinator will hold references of unneeded Exchanges).
-    // So, we should only call registerExchange just before we start to execute
-    // the plan.
-    coordinator match {
-      case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
-      case _ =>
-    }
-  }
-
   @transient lazy val inputRDD: RDD[InternalRow] = child.execute()
 
   /**
@@ -99,28 +72,13 @@ case class ShuffleExchangeExec(
     ShuffleExchangeExec.prepareShuffleDependency(
       inputRDD,
       child.output,
-      newPartitioning,
+      outputPartitioning,
       serializer,
       writeMetrics)
   }
 
-  /**
-   * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
-   * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional
-   * partition start indices array. If this optional array is defined, the returned
-   * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
-   */
-  private[exchange] def preparePostShuffleRDD(
-      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
-      specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
-    // If an array of partition start indices is provided, we need to use this array
-    // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
-    // update the number of post-shuffle partitions.
-    specifiedPartitionStartIndices.foreach { indices =>
-      assert(newPartitioning.isInstanceOf[HashPartitioning])
-      newPartitioning = UnknownPartitioning(indices.length)
-    }
-    new ShuffledRowRDD(shuffleDependency, readMetrics, specifiedPartitionStartIndices)
+  def createShuffledRDD(partitionStartIndices: Option[Array[Int]]): ShuffledRowRDD = {
+    new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices)
   }
 
   /**
@@ -131,23 +89,13 @@ case class ShuffleExchangeExec(
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
     if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = coordinator match {
-        case Some(exchangeCoordinator) =>
-          val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
-          assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
-          shuffleRDD
-        case _ =>
-          preparePostShuffleRDD(shuffleDependency)
-      }
+      cachedShuffleRDD = createShuffledRDD(None)
     }
     cachedShuffleRDD
   }
 }
 
 object ShuffleExchangeExec {
-  def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchangeExec = {
-    ShuffleExchangeExec(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator])
-  }
 
   /**
    * Determines whether records must be defensively copied before being sent to the shuffle.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 7c1f6ca..eb95719 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -320,7 +320,7 @@ abstract class StreamExecution(
       logicalPlan
 
       // Adaptive execution can change num shuffle partitions, disallow
-      sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+      sparkSessionForStream.conf.set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "false")
       // Disable cost-based join optimization as we do not want stateful operations to be rearranged
       sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
       offsetSeqMetadata = OffsetSeqMetadata(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 1705d56..0b472c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -255,8 +255,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
 
     val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
 
-    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
-      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
+    if (sparkSession.sessionState.conf.runtimeReoptimizationEnabled) {
+      logWarning(s"${SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key} " +
           "is not supported in streaming DataFrames/Datasets and will be disabled.")
     }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 4b08a4b..efd5db1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1365,7 +1365,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
         val agg = cp.groupBy('id % 2).agg(count('id))
 
         agg.queryExecution.executedPlan.collectFirst {
-          case ShuffleExchangeExec(_, _: RDDScanExec, _) =>
+          case ShuffleExchangeExec(_, _: RDDScanExec) =>
           case BroadcastExchangeExec(_, _: RDDScanExec) =>
         }.foreach { _ =>
           fail(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index faa7cbb..c2d9e54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -413,8 +413,7 @@ class PlannerSuite extends SharedSQLContext {
 
     val inputPlan = ShuffleExchangeExec(
       partitioning,
-      DummySparkPlan(outputPartitioning = partitioning),
-      None)
+      DummySparkPlan(outputPartitioning = partitioning))
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
     if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 2) {
@@ -429,8 +428,7 @@ class PlannerSuite extends SharedSQLContext {
 
     val inputPlan = ShuffleExchangeExec(
       partitioning,
-      DummySparkPlan(outputPartitioning = partitioning),
-      None)
+      DummySparkPlan(outputPartitioning = partitioning))
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
     if (outputPlan.collect { case e: ShuffleExchangeExec => true }.size == 1) {
@@ -452,7 +450,7 @@ class PlannerSuite extends SharedSQLContext {
     val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(inputPlan)
     val shuffle = outputPlan.collect { case e: ShuffleExchangeExec => e }
     assert(shuffle.size === 1)
-    assert(shuffle.head.newPartitioning === finalPartitioning)
+    assert(shuffle.head.outputPartitioning === finalPartitioning)
   }
 
   test("Reuse exchanges") {
@@ -464,8 +462,7 @@ class PlannerSuite extends SharedSQLContext {
       DummySparkPlan(
         children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
         requiredChildDistribution = Seq(distribution),
-        requiredChildOrdering = Seq(Seq.empty)),
-      None)
+        requiredChildOrdering = Seq(Seq.empty)))
 
     val inputPlan = SortMergeJoinExec(
       Literal(1) :: Nil,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
similarity index 63%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
index 74f33f6..fea77c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala
@@ -22,11 +22,12 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.adaptive._
+import org.apache.spark.sql.execution.adaptive.rule.{CoalescedShuffleReaderExec, ReduceNumShufflePartitions}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 
-class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
+class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAll {
 
   private var originalActiveSparkSession: Option[SparkSession] = _
   private var originalInstantiatedSparkSession: Option[SparkSession] = _
@@ -51,7 +52,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
   }
 
   private def checkEstimation(
-      coordinator: ExchangeCoordinator,
+      rule: ReduceNumShufflePartitions,
       bytesByPartitionIdArray: Array[Array[Long]],
       expectedPartitionStartIndices: Array[Int]): Unit = {
     val mapOutputStatistics = bytesByPartitionIdArray.zipWithIndex.map {
@@ -59,18 +60,27 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         new MapOutputStatistics(index, bytesByPartitionId)
     }
     val estimatedPartitionStartIndices =
-      coordinator.estimatePartitionStartIndices(mapOutputStatistics)
+      rule.estimatePartitionStartIndices(mapOutputStatistics)
     assert(estimatedPartitionStartIndices === expectedPartitionStartIndices)
   }
 
+  private def createReduceNumShufflePartitionsRule(
+      advisoryTargetPostShuffleInputSize: Long,
+      minNumPostShufflePartitions: Int = 1): ReduceNumShufflePartitions = {
+    val conf = new SQLConf().copy(
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE -> advisoryTargetPostShuffleInputSize,
+      SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS -> minNumPostShufflePartitions)
+    ReduceNumShufflePartitions(conf)
+  }
+
   test("test estimatePartitionStartIndices - 1 Exchange") {
-    val coordinator = new ExchangeCoordinator(100L)
+    val rule = createReduceNumShufflePartitionsRule(100L)
 
     {
       // All bytes per partition are 0.
       val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
       val expectedPartitionStartIndices = Array[Int](0)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
 
     {
@@ -78,40 +88,40 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       // 1 post-shuffle partition is needed.
       val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
       val expectedPartitionStartIndices = Array[Int](0)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
 
     {
       // 2 post-shuffle partitions are needed.
       val bytesByPartitionId = Array[Long](10, 0, 90, 20, 0)
       val expectedPartitionStartIndices = Array[Int](0, 3)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
 
     {
       // There are a few large pre-shuffle partitions.
       val bytesByPartitionId = Array[Long](110, 10, 100, 110, 0)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
 
     {
       // All pre-shuffle partitions are larger than the targeted size.
       val bytesByPartitionId = Array[Long](100, 110, 100, 110, 110)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
 
     {
       // The last pre-shuffle partition is in a single post-shuffle partition.
       val bytesByPartitionId = Array[Long](30, 30, 0, 40, 110)
       val expectedPartitionStartIndices = Array[Int](0, 4)
-      checkEstimation(coordinator, Array(bytesByPartitionId), expectedPartitionStartIndices)
+      checkEstimation(rule, Array(bytesByPartitionId), expectedPartitionStartIndices)
     }
   }
 
   test("test estimatePartitionStartIndices - 2 Exchanges") {
-    val coordinator = new ExchangeCoordinator(100L)
+    val rule = createReduceNumShufflePartitionsRule(100L)
 
     {
       // If there are multiple values of the number of pre-shuffle partitions,
@@ -122,7 +132,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
         Array(
           new MapOutputStatistics(0, bytesByPartitionId1),
           new MapOutputStatistics(1, bytesByPartitionId2))
-      intercept[AssertionError](coordinator.estimatePartitionStartIndices(mapOutputStatistics))
+      intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics))
     }
 
     {
@@ -131,7 +141,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
       val expectedPartitionStartIndices = Array[Int](0)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -143,7 +153,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 20, 0, 20)
       val expectedPartitionStartIndices = Array[Int](0)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -154,7 +164,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
       val expectedPartitionStartIndices = Array[Int](0, 2, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -165,7 +175,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -176,7 +186,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 70, 0, 30)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -187,7 +197,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 0, 110)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -198,14 +208,14 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](30, 0, 60, 70, 110)
       val expectedPartitionStartIndices = Array[Int](0, 1, 2, 3, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
   }
 
   test("test estimatePartitionStartIndices and enforce minimal number of reducers") {
-    val coordinator = new ExchangeCoordinator(100L, Some(2))
+    val rule = createReduceNumShufflePartitionsRule(100L, 2)
 
     {
       // The minimal number of post-shuffle partitions is not enforced because
@@ -214,7 +224,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
       val expectedPartitionStartIndices = Array[Int](0)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -225,7 +235,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](5, 10, 0, 10, 5)
       val expectedPartitionStartIndices = Array[Int](0, 3)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -236,7 +246,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
       val bytesByPartitionId2 = Array[Long](40, 10, 0, 10, 30)
       val expectedPartitionStartIndices = Array[Int](0, 1, 3, 4)
       checkEstimation(
-        coordinator,
+        rule,
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionStartIndices)
     }
@@ -257,24 +267,24 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
   def withSparkSession(
       f: SparkSession => Unit,
-      targetNumPostShufflePartitions: Int,
+      targetPostShuffleInputSize: Int,
       minNumPostShufflePartitions: Option[Int]): Unit = {
     val sparkConf =
       new SparkConf(false)
         .setMaster("local[*]")
         .setAppName("test")
         .set(UI_ENABLED, false)
-        .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
-        .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+        .set(SQLConf.SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS.key, "5")
+        .set(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key, "true")
         .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
         .set(
           SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key,
-          targetNumPostShufflePartitions.toString)
+          targetPostShuffleInputSize.toString)
     minNumPostShufflePartitions match {
       case Some(numPartitions) =>
         sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, numPartitions.toString)
       case None =>
-        sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "-1")
+        sparkConf.set(SQLConf.SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS.key, "1")
     }
 
     val spark = SparkSession.builder()
@@ -304,25 +314,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
-        val exchanges = agg.queryExecution.executedPlan.collect {
-          case e: ShuffleExchangeExec => e
+        val finalPlan = agg.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+        val shuffleReaders = finalPlan.collect {
+          case reader: CoalescedShuffleReaderExec => reader
         }
-        assert(exchanges.length === 1)
+        assert(shuffleReaders.length === 1)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 5)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === numPartitions)
             }
 
           case None =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 3)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === 3)
             }
         }
       }
@@ -355,25 +361,21 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
-        val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchangeExec => e
+        val finalPlan = join.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+        val shuffleReaders = finalPlan.collect {
+          case reader: CoalescedShuffleReaderExec => reader
         }
-        assert(exchanges.length === 2)
+        assert(shuffleReaders.length === 2)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 5)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === numPartitions)
             }
 
           case None =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 2)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === 2)
             }
         }
       }
@@ -411,26 +413,26 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
-        val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchangeExec => e
+        val finalPlan = join.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+        val shuffleReaders = finalPlan.collect {
+          case reader: CoalescedShuffleReaderExec => reader
         }
-        assert(exchanges.length === 4)
+        assert(shuffleReaders.length === 2)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 5)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === numPartitions)
             }
 
           case None =>
-            assert(exchanges.forall(_.coordinator.isDefined))
-            assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(2, 3))
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === 2)
+            }
         }
       }
 
-      withSparkSession(test, 6644, minNumPostShufflePartitions)
+      withSparkSession(test, 16384, minNumPostShufflePartitions)
     }
 
     test(s"determining the number of reducers: complex query 2$testNameNote") {
@@ -463,39 +465,131 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll {
 
         // Then, let's look at the number of post-shuffle partitions estimated
         // by the ExchangeCoordinator.
-        val exchanges = join.queryExecution.executedPlan.collect {
-          case e: ShuffleExchangeExec => e
+        val finalPlan = join.queryExecution.executedPlan
+          .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+        val shuffleReaders = finalPlan.collect {
+          case reader: CoalescedShuffleReaderExec => reader
         }
-        assert(exchanges.length === 3)
+        assert(shuffleReaders.length === 2)
         minNumPostShufflePartitions match {
           case Some(numPartitions) =>
-            exchanges.foreach {
-              case e: ShuffleExchangeExec =>
-                assert(e.coordinator.isDefined)
-                assert(e.outputPartitioning.numPartitions === 5)
-              case o =>
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === numPartitions)
             }
 
           case None =>
-            assert(exchanges.forall(_.coordinator.isDefined))
-            assert(exchanges.map(_.outputPartitioning.numPartitions).toSet === Set(5, 3))
+            shuffleReaders.foreach { reader =>
+              assert(reader.outputPartitioning.numPartitions === 3)
+            }
         }
       }
 
-      withSparkSession(test, 6144, minNumPostShufflePartitions)
+      withSparkSession(test, 12000, minNumPostShufflePartitions)
+    }
+
+    test(s"determining the number of reducers: plan already partitioned$testNameNote") {
+      val test: SparkSession => Unit = { spark: SparkSession =>
+        try {
+          spark.range(1000).write.bucketBy(30, "id").saveAsTable("t")
+          // `df1` is hash partitioned by `id`.
+          val df1 = spark.read.table("t")
+          val df2 =
+            spark
+              .range(0, 1000, 1, numInputPartitions)
+              .selectExpr("id % 500 as key2", "id as value2")
+
+          val join = df1.join(df2, col("id") === col("key2")).select(col("id"), col("value2"))
+
+          // Check the answer first.
+          val expectedAnswer = spark.range(0, 500).selectExpr("id % 500", "id as value")
+            .union(spark.range(500, 1000).selectExpr("id % 500", "id as value"))
+          checkAnswer(
+            join,
+            expectedAnswer.collect())
+
+          // Then, let's make sure we do not reduce number of ppst shuffle partitions.
+          val finalPlan = join.queryExecution.executedPlan
+            .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+          val shuffleReaders = finalPlan.collect {
+            case reader: CoalescedShuffleReaderExec => reader
+          }
+          assert(shuffleReaders.length === 0)
+        } finally {
+          spark.sql("drop table t")
+        }
+      }
+      withSparkSession(test, 12000, minNumPostShufflePartitions)
     }
   }
 
   test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") {
-    val test = { spark: SparkSession =>
+    val test: SparkSession => Unit = { spark: SparkSession =>
       spark.sql("SET spark.sql.exchange.reuse=true")
       val df = spark.range(1).selectExpr("id AS key", "id AS value")
+
+      // test case 1: a query stage has 3 child stages but they are the same stage.
+      // Final Stage 1
+      //   ShuffleQueryStage 0
+      //   ReusedQueryStage 0
+      //   ReusedQueryStage 0
       val resultDf = df.join(df, "key").join(df, "key")
-      val sparkPlan = resultDf.queryExecution.executedPlan
-      assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1)
-      assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3)
       checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
+      val finalPlan = resultDf.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+      assert(finalPlan.collect { case p: ReusedQueryStageExec => p }.length == 2)
+      assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 3)
+
+
+      // test case 2: a query stage has 2 parent stages.
+      // Final Stage 3
+      //   ShuffleQueryStage 1
+      //     ShuffleQueryStage 0
+      //   ShuffleQueryStage 2
+      //     ReusedQueryStage 0
+      val grouped = df.groupBy("key").agg(max("value").as("value"))
+      val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
+        .union(grouped.groupBy(col("key") + 2).max("value"))
+      checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
+
+      val finalPlan2 = resultDf2.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+
+      // The result stage has 2 children
+      val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
+      assert(level1Stages.length == 2)
+
+      val leafStages = level1Stages.flatMap { stage =>
+        // All of the child stages of result stage have only one child stage.
+        val children = stage.plan.collect { case q: QueryStageExec => q }
+        assert(children.length == 1)
+        children
+      }
+      assert(leafStages.length == 2)
+
+      val reusedStages = level1Stages.flatMap { stage =>
+        stage.plan.collect { case r: ReusedQueryStageExec => r }
+      }
+      assert(reusedStages.length == 1)
     }
     withSparkSession(test, 4, None)
   }
+
+  test("Union two datasets with different pre-shuffle partition number") {
+    val test: SparkSession => Unit = { spark: SparkSession =>
+      val dataset1 = spark.range(3)
+      val dataset2 = spark.range(3)
+
+      val resultDf = dataset1.repartition(2, dataset1.col("id"))
+        .union(dataset2.repartition(3, dataset2.col("id"))).toDF()
+
+      checkAnswer(resultDf,
+        Seq((0), (0), (1), (1), (2), (2)).map(i => Row(i)))
+      val finalPlan = resultDf.queryExecution.executedPlan
+        .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+      // As the pre-shuffle partition number are different, we will skip reducing
+      // the shuffle partition numbers.
+      assert(finalPlan.collect { case p: CoalescedShuffleReaderExec => p }.length == 0)
+    }
+    withSparkSession(test, 100, None)
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 2cddf7c..0a0973a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.adaptive.rule.CoalescedShuffleReaderExec
 import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
 import org.apache.spark.sql.internal.SQLConf
@@ -92,6 +93,30 @@ class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("Change merge join to broadcast join and reduce number of shuffle partitions") {
+    withSQLConf(
+      SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
+      SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "150") {
+      val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT * FROM testData join testData2 ON key = a where value = '1'")
+      val smj = findTopLevelSortMergeJoin(plan)
+      assert(smj.size == 1)
+      val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
+      assert(bhj.size == 1)
+
+      val shuffleReaders = adaptivePlan.collect {
+        case reader: CoalescedShuffleReaderExec => reader
+      }
+      assert(shuffleReaders.length === 1)
+      // The pre-shuffle partition size is [0, 72, 0, 72, 126]
+      shuffleReaders.foreach { reader =>
+        assert(reader.outputPartitioning.numPartitions === 2)
+      }
+    }
+  }
+
   test("Scalar subquery") {
     withSQLConf(
         SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org