You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/21 21:32:34 UTC

[2/2] spark git commit: [SPARK-13136][SQL] Create a dedicated Broadcast exchange operator

[SPARK-13136][SQL] Create a dedicated Broadcast exchange operator

Quite a few Spark SQL join operators broadcast one side of the join to all nodes. The are a few problems with this:

- This conflates broadcasting (a data exchange) with joining. Data exchanges should be managed by a different operator.
- All these nodes implement their own (duplicate) broadcasting logic.
- Re-use of indices is quite hard.

This PR defines both a ```BroadcastDistribution``` and ```BroadcastPartitioning```, these contain a `BroadcastMode`. The `BroadcastMode` defines the way in which we transform the Array of `InternalRow`'s into an index. We currently support the following `BroadcastMode`'s:

- IdentityBroadcastMode: This broadcasts the rows in their original form.
- HashSetBroadcastMode: This applies a projection to the input rows, deduplicates these rows and broadcasts the resulting `Set`.
- HashedRelationBroadcastMode: This transforms the input rows into a `HashedRelation`, and broadcasts this index.

To match this distribution we implement a ```BroadcastExchange``` operator which will perform the broadcast for us, and have ```EnsureRequirements``` plan this operator. The old Exchange operator has been renamed into ShuffleExchange in order to clearly separate between Shuffled and Broadcasted exchanges. Finally the classes in Exchange.scala have been moved to a dedicated package.

cc rxin davies

Author: Herman van Hovell <hv...@questtec.nl>

Closes #11083 from hvanhovell/SPARK-13136.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6a873d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6a873d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6a873d6

Branch: refs/heads/master
Commit: b6a873d6d4682796f55dbafadd0b5cad881f96ea
Parents: af441dd
Author: Herman van Hovell <hv...@questtec.nl>
Authored: Sun Feb 21 12:32:31 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Feb 21 12:32:31 2016 -0800

----------------------------------------------------------------------
 .../catalyst/plans/physical/broadcastMode.scala |  35 ++
 .../catalyst/plans/physical/partitioning.scala  |  30 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  12 +-
 .../apache/spark/sql/execution/Exchange.scala   | 496 -------------------
 .../sql/execution/ExchangeCoordinator.scala     | 271 ----------
 .../apache/spark/sql/execution/SparkPlan.scala  |  34 +-
 .../spark/sql/execution/SparkStrategies.scala   |   8 +-
 .../spark/sql/execution/WholeStageCodegen.scala |   5 +
 .../execution/exchange/BroadcastExchange.scala  |  89 ++++
 .../execution/exchange/EnsureRequirements.scala | 261 ++++++++++
 .../exchange/ExchangeCoordinator.scala          | 273 ++++++++++
 .../execution/exchange/ShuffleExchange.scala    | 261 ++++++++++
 .../sql/execution/joins/BroadcastHashJoin.scala |  75 +--
 .../joins/BroadcastLeftSemiJoinHash.scala       |  25 +-
 .../joins/BroadcastNestedLoopJoin.scala         |  25 +-
 .../sql/execution/joins/HashSemiJoin.scala      |  51 +-
 .../sql/execution/joins/HashedRelation.scala    |  22 +-
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |  23 +-
 .../org/apache/spark/sql/execution/limit.scala  |   7 +-
 .../org/apache/spark/sql/CachedTableSuite.scala |   4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   4 +-
 .../execution/ExchangeCoordinatorSuite.scala    |  21 +-
 .../spark/sql/execution/ExchangeSuite.scala     |   3 +-
 .../spark/sql/execution/PlannerSuite.scala      |  21 +-
 .../execution/joins/BroadcastJoinSuite.scala    |   5 +-
 .../sql/execution/joins/InnerJoinSuite.scala    |  11 +-
 .../sql/execution/joins/OuterJoinSuite.scala    |   3 +-
 .../sql/execution/joins/SemiJoinSuite.scala     |   3 +-
 .../spark/sql/sources/BucketedReadSuite.scala   |  13 +-
 29 files changed, 1158 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
new file mode 100644
index 0000000..c646dcf
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.physical
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
+ * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
+ */
+trait BroadcastMode {
+  def transform(rows: Array[InternalRow]): Any
+}
+
+/**
+ * IdentityBroadcastMode requires that rows are broadcasted in their original form.
+ */
+case object IdentityBroadcastMode extends BroadcastMode {
+  override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index d6e10c4..45e2841 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.physical
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.{DataType, IntegerType}
 
@@ -76,6 +77,12 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
 }
 
 /**
+  * Represents data where tuples are broadcasted to every node. It is quite common that the
+  * entire set of tuples is transformed into different data structure.
+  */
+case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
+
+/**
  * Describes how an operator's output is split across partitions. The `compatibleWith`,
  * `guarantees`, and `satisfies` methods describe relationships between child partitionings,
  * target partitionings, and [[Distribution]]s. These relations are described more precisely in
@@ -213,7 +220,10 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
 case object SinglePartition extends Partitioning {
   val numPartitions = 1
 
-  override def satisfies(required: Distribution): Boolean = true
+  override def satisfies(required: Distribution): Boolean = required match {
+    case _: BroadcastDistribution => false
+    case _ => true
+  }
 
   override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1
 
@@ -351,3 +361,21 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
     partitionings.map(_.toString).mkString("(", " or ", ")")
   }
 }
+
+/**
+ * Represents a partitioning where rows are collected, transformed and broadcasted to each
+ * node in the cluster.
+ */
+case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
+  override val numPartitions: Int = 1
+
+  override def satisfies(required: Distribution): Boolean = required match {
+    case BroadcastDistribution(m) if m == mode => true
+    case _ => false
+  }
+
+  override def compatibleWith(other: Partitioning): Boolean = other match {
+    case BroadcastPartitioning(m) if m == mode => true
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 932df36..a2f3868 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan,
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
@@ -59,7 +60,6 @@ import org.apache.spark.util.Utils
  * @groupname config Configuration
  * @groupname dataframes Custom DataFrame Creation
  * @groupname Ungrouped Support functions for language integrated queries
- *
  * @since 1.0.0
  */
 class SQLContext private[sql](
@@ -313,10 +313,10 @@ class SQLContext private[sql](
   }
 
   /**
-    * Returns true if the [[Queryable]] is currently cached in-memory.
-    * @group cachemgmt
-    * @since 1.3.0
-    */
+   * Returns true if the [[Queryable]] is currently cached in-memory.
+   * @group cachemgmt
+   * @since 1.3.0
+   */
   private[sql] def isCached(qName: Queryable): Boolean = {
     cacheManager.lookupCachedData(qName).nonEmpty
   }
@@ -364,6 +364,7 @@ class SQLContext private[sql](
 
     /**
      * Converts $"col name" into an [[Column]].
+     *
      * @since 1.3.0
      */
     // This must live here to preserve binary compatibility with Spark < 1.5.
@@ -728,7 +729,6 @@ class SQLContext private[sql](
    * cached/persisted before, it's also unpersisted.
    *
    * @param tableName the name of the table to be unregistered.
-   *
    * @group basic
    * @since 1.3.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
deleted file mode 100644
index e30adef..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.util.Random
-
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.serializer.Serializer
-import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.shuffle.sort.SortShuffleManager
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.util.MutablePair
-
-/**
- * Performs a shuffle that will result in the desired `newPartitioning`.
- */
-case class Exchange(
-    var newPartitioning: Partitioning,
-    child: SparkPlan,
-    @transient coordinator: Option[ExchangeCoordinator]) extends UnaryNode {
-
-  override def nodeName: String = {
-    val extraInfo = coordinator match {
-      case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
-        s"(coordinator id: ${System.identityHashCode(coordinator)})"
-      case Some(exchangeCoordinator) if !exchangeCoordinator.isEstimated =>
-        s"(coordinator id: ${System.identityHashCode(coordinator)})"
-      case None => ""
-    }
-
-    val simpleNodeName = "Exchange"
-    s"$simpleNodeName$extraInfo"
-  }
-
-  override def outputPartitioning: Partitioning = newPartitioning
-
-  override def output: Seq[Attribute] = child.output
-
-  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
-
-  override protected def doPrepare(): Unit = {
-    // If an ExchangeCoordinator is needed, we register this Exchange operator
-    // to the coordinator when we do prepare. It is important to make sure
-    // we register this operator right before the execution instead of register it
-    // in the constructor because it is possible that we create new instances of
-    // Exchange operators when we transform the physical plan
-    // (then the ExchangeCoordinator will hold references of unneeded Exchanges).
-    // So, we should only call registerExchange just before we start to execute
-    // the plan.
-    coordinator match {
-      case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this)
-      case None =>
-    }
-  }
-
-  /**
-   * Returns a [[ShuffleDependency]] that will partition rows of its child based on
-   * the partitioning scheme defined in `newPartitioning`. Those partitions of
-   * the returned ShuffleDependency will be the input of shuffle.
-   */
-  private[sql] def prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] = {
-    Exchange.prepareShuffleDependency(child.execute(), child.output, newPartitioning, serializer)
-  }
-
-  /**
-   * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset.
-   * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional
-   * partition start indices array. If this optional array is defined, the returned
-   * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array.
-   */
-  private[sql] def preparePostShuffleRDD(
-      shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow],
-      specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = {
-    // If an array of partition start indices is provided, we need to use this array
-    // to create the ShuffledRowRDD. Also, we need to update newPartitioning to
-    // update the number of post-shuffle partitions.
-    specifiedPartitionStartIndices.foreach { indices =>
-      assert(newPartitioning.isInstanceOf[HashPartitioning])
-      newPartitioning = UnknownPartitioning(indices.length)
-    }
-    new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
-    coordinator match {
-      case Some(exchangeCoordinator) =>
-        val shuffleRDD = exchangeCoordinator.postShuffleRDD(this)
-        assert(shuffleRDD.partitions.length == newPartitioning.numPartitions)
-        shuffleRDD
-      case None =>
-        val shuffleDependency = prepareShuffleDependency()
-        preparePostShuffleRDD(shuffleDependency)
-    }
-  }
-}
-
-object Exchange {
-  def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = {
-    Exchange(newPartitioning, child, coordinator = None: Option[ExchangeCoordinator])
-  }
-
-  /**
-   * Determines whether records must be defensively copied before being sent to the shuffle.
-   * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
-   * shuffle code assumes that objects are immutable and hence does not perform its own defensive
-   * copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In
-   * order to properly shuffle the output of these operators, we need to perform our own copying
-   * prior to sending records to the shuffle. This copying is expensive, so we try to avoid it
-   * whenever possible. This method encapsulates the logic for choosing when to copy.
-   *
-   * In the long run, we might want to push this logic into core's shuffle APIs so that we don't
-   * have to rely on knowledge of core internals here in SQL.
-   *
-   * See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue.
-   *
-   * @param partitioner the partitioner for the shuffle
-   * @param serializer the serializer that will be used to write rows
-   * @return true if rows should be copied before being shuffled, false otherwise
-   */
-  private def needToCopyObjectsBeforeShuffle(
-      partitioner: Partitioner,
-      serializer: Serializer): Boolean = {
-    // Note: even though we only use the partitioner's `numPartitions` field, we require it to be
-    // passed instead of directly passing the number of partitions in order to guard against
-    // corner-cases where a partitioner constructed with `numPartitions` partitions may output
-    // fewer partitions (like RangePartitioner, for example).
-    val conf = SparkEnv.get.conf
-    val shuffleManager = SparkEnv.get.shuffleManager
-    val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager]
-    val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
-    if (sortBasedShuffleOn) {
-      val bypassIsSupported = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
-      if (bypassIsSupported && partitioner.numPartitions <= bypassMergeThreshold) {
-        // If we're using the original SortShuffleManager and the number of output partitions is
-        // sufficiently small, then Spark will fall back to the hash-based shuffle write path, which
-        // doesn't buffer deserialized records.
-        // Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass.
-        false
-      } else if (serializer.supportsRelocationOfSerializedObjects) {
-        // SPARK-4550 and  SPARK-7081 extended sort-based shuffle to serialize individual records
-        // prior to sorting them. This optimization is only applied in cases where shuffle
-        // dependency does not specify an aggregator or ordering and the record serializer has
-        // certain properties. If this optimization is enabled, we can safely avoid the copy.
-        //
-        // Exchange never configures its ShuffledRDDs with aggregators or key orderings, so we only
-        // need to check whether the optimization is enabled and supported by our serializer.
-        false
-      } else {
-        // Spark's SortShuffleManager uses `ExternalSorter` to buffer records in memory, so we must
-        // copy.
-        true
-      }
-    } else if (shuffleManager.isInstanceOf[HashShuffleManager]) {
-      // We're using hash-based shuffle, so we don't need to copy.
-      false
-    } else {
-      // Catch-all case to safely handle any future ShuffleManager implementations.
-      true
-    }
-  }
-
-  /**
-   * Returns a [[ShuffleDependency]] that will partition rows of its child based on
-   * the partitioning scheme defined in `newPartitioning`. Those partitions of
-   * the returned ShuffleDependency will be the input of shuffle.
-   */
-  private[sql] def prepareShuffleDependency(
-      rdd: RDD[InternalRow],
-      outputAttributes: Seq[Attribute],
-      newPartitioning: Partitioning,
-      serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = {
-    val part: Partitioner = newPartitioning match {
-      case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
-      case HashPartitioning(_, n) =>
-        new Partitioner {
-          override def numPartitions: Int = n
-          // For HashPartitioning, the partitioning key is already a valid partition ID, as we use
-          // `HashPartitioning.partitionIdExpression` to produce partitioning key.
-          override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-        }
-      case RangePartitioning(sortingExpressions, numPartitions) =>
-        // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
-        // partition bounds. To get accurate samples, we need to copy the mutable keys.
-        val rddForSampling = rdd.mapPartitionsInternal { iter =>
-          val mutablePair = new MutablePair[InternalRow, Null]()
-          iter.map(row => mutablePair.update(row.copy(), null))
-        }
-        implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
-        new RangePartitioner(numPartitions, rddForSampling, ascending = true)
-      case SinglePartition =>
-        new Partitioner {
-          override def numPartitions: Int = 1
-          override def getPartition(key: Any): Int = 0
-        }
-      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
-      // TODO: Handle BroadcastPartitioning.
-    }
-    def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
-      case RoundRobinPartitioning(numPartitions) =>
-        // Distributes elements evenly across output partitions, starting from a random partition.
-        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
-        (row: InternalRow) => {
-          // The HashPartitioner will handle the `mod` by the number of partitions
-          position += 1
-          position
-        }
-      case h: HashPartitioning =>
-        val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
-        row => projection(row).getInt(0)
-      case RangePartitioning(_, _) | SinglePartition => identity
-      case _ => sys.error(s"Exchange not implemented for $newPartitioning")
-    }
-    val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = {
-      if (needToCopyObjectsBeforeShuffle(part, serializer)) {
-        rdd.mapPartitionsInternal { iter =>
-          val getPartitionKey = getPartitionKeyExtractor()
-          iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) }
-        }
-      } else {
-        rdd.mapPartitionsInternal { iter =>
-          val getPartitionKey = getPartitionKeyExtractor()
-          val mutablePair = new MutablePair[Int, InternalRow]()
-          iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) }
-        }
-      }
-    }
-
-    // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds
-    // are in the form of (partitionId, row) and every partitionId is in the expected range
-    // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough.
-    val dependency =
-      new ShuffleDependency[Int, InternalRow, InternalRow](
-        rddWithPartitionIds,
-        new PartitionIdPassthrough(part.numPartitions),
-        Some(serializer))
-
-    dependency
-  }
-}
-
-/**
- * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
- * of input data meets the
- * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
- * each operator by inserting [[Exchange]] Operators where required.  Also ensure that the
- * input partition ordering requirements are met.
- */
-private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = sqlContext.conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = sqlContext.conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = sqlContext.conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = sqlContext.conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
-  }
-
-  /**
-   * Given a required distribution, returns a partitioning that satisfies that distribution.
-   */
-  private def createPartitioning(
-      requiredDistribution: Distribution,
-      numPartitions: Int): Partitioning = {
-    requiredDistribution match {
-      case AllTuples => SinglePartition
-      case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions)
-      case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions)
-      case dist => sys.error(s"Do not know how to satisfy distribution $dist")
-    }
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[Exchange]]s if adaptive query execution is enabled
-   * and partitioning schemes of these [[Exchange]]s support [[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[Exchange])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ Exchange(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may still need to
-        // shuffle data when we have more than one children because data generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution =
-          requiredChildDistributions.forall(_.isInstanceOf[ClusteredDistribution])
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            children.length,
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: Exchange, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange for now.
-            // Ideally, we can try to avoid this Exchange. However, when we reach here,
-            // there are at least two children operators (because if there is a single child
-            // and we can avoid Exchange, supportsCoordinator will be false and we
-            // will not reach here.). Although we can make two children have the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator of the Join to
-            // have a number of post-shuffle partitions that matches the number of partitions of
-            // Agg, we cannot say these two children are partitioned in the same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices [0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent the post-shuffle
-            // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
-            val targetPartitioning =
-              createPartitioning(distribution, defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            Exchange(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are returned.
-        children
-      }
-
-    withCoordinator
-  }
-
-  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
-    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
-    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
-    var children: Seq[SparkPlan] = operator.children
-    assert(requiredChildDistributions.length == children.length)
-    assert(requiredChildOrderings.length == children.length)
-
-    // Ensure that the operator's children satisfy their output distribution requirements:
-    children = children.zip(requiredChildDistributions).map { case (child, distribution) =>
-      if (child.outputPartitioning.satisfies(distribution)) {
-        child
-      } else {
-        Exchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
-      }
-    }
-
-    // If the operator has multiple children and specifies child output distributions (e.g. join),
-    // then the children's output partitionings must be compatible:
-    if (children.length > 1
-        && requiredChildDistributions.toSet != Set(UnspecifiedDistribution)
-        && !Partitioning.allCompatible(children.map(_.outputPartitioning))) {
-
-      // First check if the existing partitions of the children all match. This means they are
-      // partitioned by the same partitioning into the same number of partitions. In that case,
-      // don't try to make them match `defaultPartitions`, just use the existing partitioning.
-      val maxChildrenNumPartitions = children.map(_.outputPartitioning.numPartitions).max
-      val useExistingPartitioning = children.zip(requiredChildDistributions).forall {
-        case (child, distribution) => {
-          child.outputPartitioning.guarantees(
-            createPartitioning(distribution, maxChildrenNumPartitions))
-        }
-      }
-
-      children = if (useExistingPartitioning) {
-        // We do not need to shuffle any child's output.
-        children
-      } else {
-        // We need to shuffle at least one child's output.
-        // Now, we will determine the number of partitions that will be used by created
-        // partitioning schemes.
-        val numPartitions = {
-          // Let's see if we need to shuffle all child's outputs when we use
-          // maxChildrenNumPartitions.
-          val shufflesAllChildren = children.zip(requiredChildDistributions).forall {
-            case (child, distribution) => {
-              !child.outputPartitioning.guarantees(
-                createPartitioning(distribution, maxChildrenNumPartitions))
-            }
-          }
-          // If we need to shuffle all children, we use defaultNumPreShufflePartitions as the
-          // number of partitions. Otherwise, we use maxChildrenNumPartitions.
-          if (shufflesAllChildren) defaultNumPreShufflePartitions else maxChildrenNumPartitions
-        }
-
-        children.zip(requiredChildDistributions).map {
-          case (child, distribution) => {
-            val targetPartitioning =
-              createPartitioning(distribution, numPartitions)
-            if (child.outputPartitioning.guarantees(targetPartitioning)) {
-              child
-            } else {
-              child match {
-                // If child is an exchange, we replace it with
-                // a new one having targetPartitioning.
-                case Exchange(_, c, _) => Exchange(targetPartitioning, c)
-                case _ => Exchange(targetPartitioning, child)
-              }
-            }
-          }
-        }
-      }
-    }
-
-    // Now, we need to add ExchangeCoordinator if necessary.
-    // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
-    // However, with the way that we plan the query, we do not have a place where we have a
-    // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
-    // at here for now.
-    // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
-    // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
-    children = withExchangeCoordinator(children, requiredChildDistributions)
-
-    // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
-    children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
-      if (requiredOrdering.nonEmpty) {
-        // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
-        if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
-          Sort(requiredOrdering, global = false, child = child)
-        } else {
-          child
-        }
-      } else {
-        child
-      }
-    }
-
-    operator.withNewChildren(children)
-  }
-
-  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case operator @ Exchange(partitioning, child, _) =>
-      child.children match {
-        case Exchange(childPartitioning, baseChild, _)::Nil =>
-          if (childPartitioning.guarantees(partitioning)) child else operator
-        case _ => operator
-      }
-    case operator: SparkPlan => ensureDistributionAndOrdering(operator)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/ExchangeCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExchangeCoordinator.scala
deleted file mode 100644
index 07015e5..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExchangeCoordinator.scala
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.util.{HashMap => JHashMap, Map => JMap}
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.{Logging, MapOutputStatistics, ShuffleDependency, SimpleFutureAction}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-
-/**
- * A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
- * Right now, the work of this coordinator is to determine the number of post-shuffle partitions
- * for a stage that needs to fetch shuffle data from one or multiple stages.
- *
- * A coordinator is constructed with three parameters, `numExchanges`,
- * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
- *  - `numExchanges` is used to indicated that how many [[Exchange]]s that will be registered to
- *    this coordinator. So, when we start to do any actual work, we have a way to make sure that
- *    we have got expected number of [[Exchange]]s.
- *  - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
- *    input data size. With this parameter, we can estimate the number of post-shuffle partitions.
- *    This parameter is configured through
- *    `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
- *  - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
- *    will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
- *    partitions.
- *
- * The workflow of this coordinator is described as follows:
- *  - Before the execution of a [[SparkPlan]], for an [[Exchange]] operator,
- *    if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
- *    This happens in the `doPrepare` method.
- *  - Once we start to execute a physical plan, an [[Exchange]] registered to this coordinator will
- *    call `postShuffleRDD` to get its corresponding post-shuffle [[ShuffledRowRDD]].
- *    If this coordinator has made the decision on how to shuffle data, this [[Exchange]] will
- *    immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
- *  - If this coordinator has not made the decision on how to shuffle data, it will ask those
- *    registered [[Exchange]]s to submit their pre-shuffle stages. Then, based on the the size
- *    statistics of pre-shuffle partitions, this coordinator will determine the number of
- *    post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
- *    to a single post-shuffle partition whenever necessary.
- *  - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
- *    [[Exchange]]s. So, when an [[Exchange]] calls `postShuffleRDD`, this coordinator can
- *    lookup the corresponding [[RDD]].
- *
- * The strategy used to determine the number of post-shuffle partitions is described as follows.
- * To determine the number of post-shuffle partitions, we have a target input size for a
- * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
- * corresponding to the registered [[Exchange]]s, we will do a pass of those statistics and
- * pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
- * the size of a post-shuffle partition is equal or greater than the target size.
- * For example, we have two stages with the following pre-shuffle partition size statistics:
- * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB]
- * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
- * assuming the target input size is 128 MB, we will have three post-shuffle partitions,
- * which are:
- *  - post-shuffle partition 0: pre-shuffle partition 0 and 1
- *  - post-shuffle partition 1: pre-shuffle partition 2
- *  - post-shuffle partition 2: pre-shuffle partition 3 and 4
- */
-private[sql] class ExchangeCoordinator(
-    numExchanges: Int,
-    advisoryTargetPostShuffleInputSize: Long,
-    minNumPostShufflePartitions: Option[Int] = None)
-  extends Logging {
-
-  // The registered Exchange operators.
-  private[this] val exchanges = ArrayBuffer[Exchange]()
-
-  // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
-  private[this] val postShuffleRDDs: JMap[Exchange, ShuffledRowRDD] =
-    new JHashMap[Exchange, ShuffledRowRDD](numExchanges)
-
-  // A boolean that indicates if this coordinator has made decision on how to shuffle data.
-  // This variable will only be updated by doEstimationIfNecessary, which is protected by
-  // synchronized.
-  @volatile private[this] var estimated: Boolean = false
-
-  /**
-   * Registers an [[Exchange]] operator to this coordinator. This method is only allowed to be
-   * called in the `doPrepare` method of an [[Exchange]] operator.
-   */
-  @GuardedBy("this")
-  def registerExchange(exchange: Exchange): Unit = synchronized {
-    exchanges += exchange
-  }
-
-  def isEstimated: Boolean = estimated
-
-  /**
-   * Estimates partition start indices for post-shuffle partitions based on
-   * mapOutputStatistics provided by all pre-shuffle stages.
-   */
-  private[sql] def estimatePartitionStartIndices(
-      mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
-    // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
-    // a stage when the number of partitions of this dependency is 0.
-    assert(mapOutputStatistics.length <= numExchanges)
-
-    // If minNumPostShufflePartitions is defined, it is possible that we need to use a
-    // value less than advisoryTargetPostShuffleInputSize as the target input size of
-    // a post shuffle task.
-    val targetPostShuffleInputSize = minNumPostShufflePartitions match {
-      case Some(numPartitions) =>
-        val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
-        // The max at here is to make sure that when we have an empty table, we
-        // only have a single post-shuffle partition.
-        // There is no particular reason that we pick 16. We just need a number to
-        // prevent maxPostShuffleInputSize from being set to 0.
-        val maxPostShuffleInputSize =
-          math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
-        math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
-
-      case None => advisoryTargetPostShuffleInputSize
-    }
-
-    logInfo(
-      s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
-      s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
-
-    // Make sure we do get the same number of pre-shuffle partitions for those stages.
-    val distinctNumPreShufflePartitions =
-      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
-    // The reason that we are expecting a single value of the number of pre-shuffle partitions
-    // is that when we add Exchanges, we set the number of pre-shuffle partitions
-    // (i.e. map output partitions) using a static setting, which is the value of
-    // spark.sql.shuffle.partitions. Even if two input RDDs are having different
-    // number of partitions, they will have the same number of pre-shuffle partitions
-    // (i.e. map output partitions).
-    assert(
-      distinctNumPreShufflePartitions.length == 1,
-      "There should be only one distinct value of the number pre-shuffle partitions " +
-        "among registered Exchange operator.")
-    val numPreShufflePartitions = distinctNumPreShufflePartitions.head
-
-    val partitionStartIndices = ArrayBuffer[Int]()
-    // The first element of partitionStartIndices is always 0.
-    partitionStartIndices += 0
-
-    var postShuffleInputSize = 0L
-
-    var i = 0
-    while (i < numPreShufflePartitions) {
-      // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
-      // Then, we add the total size to postShuffleInputSize.
-      var j = 0
-      while (j < mapOutputStatistics.length) {
-        postShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
-        j += 1
-      }
-
-      // If the current postShuffleInputSize is equal or greater than the
-      // targetPostShuffleInputSize, We need to add a new element in partitionStartIndices.
-      if (postShuffleInputSize >= targetPostShuffleInputSize) {
-        if (i < numPreShufflePartitions - 1) {
-          // Next start index.
-          partitionStartIndices += i + 1
-        } else {
-          // This is the last element. So, we do not need to append the next start index to
-          // partitionStartIndices.
-        }
-        // reset postShuffleInputSize.
-        postShuffleInputSize = 0L
-      }
-
-      i += 1
-    }
-
-    partitionStartIndices.toArray
-  }
-
-  @GuardedBy("this")
-  private def doEstimationIfNecessary(): Unit = synchronized {
-    // It is unlikely that this method will be called from multiple threads
-    // (when multiple threads trigger the execution of THIS physical)
-    // because in common use cases, we will create new physical plan after
-    // users apply operations (e.g. projection) to an existing DataFrame.
-    // However, if it happens, we have synchronized to make sure only one
-    // thread will trigger the job submission.
-    if (!estimated) {
-      // Make sure we have the expected number of registered Exchange operators.
-      assert(exchanges.length == numExchanges)
-
-      val newPostShuffleRDDs = new JHashMap[Exchange, ShuffledRowRDD](numExchanges)
-
-      // Submit all map stages
-      val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
-      val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]()
-      var i = 0
-      while (i < numExchanges) {
-        val exchange = exchanges(i)
-        val shuffleDependency = exchange.prepareShuffleDependency()
-        shuffleDependencies += shuffleDependency
-        if (shuffleDependency.rdd.partitions.length != 0) {
-          // submitMapStage does not accept RDD with 0 partition.
-          // So, we will not submit this dependency.
-          submittedStageFutures +=
-            exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency)
-        }
-        i += 1
-      }
-
-      // Wait for the finishes of those submitted map stages.
-      val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length)
-      var j = 0
-      while (j < submittedStageFutures.length) {
-        // This call is a blocking call. If the stage has not finished, we will wait at here.
-        mapOutputStatistics(j) = submittedStageFutures(j).get()
-        j += 1
-      }
-
-      // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
-      // number of post-shuffle partitions.
-      val partitionStartIndices =
-        if (mapOutputStatistics.length == 0) {
-          None
-        } else {
-          Some(estimatePartitionStartIndices(mapOutputStatistics))
-        }
-
-      var k = 0
-      while (k < numExchanges) {
-        val exchange = exchanges(k)
-        val rdd =
-          exchange.preparePostShuffleRDD(shuffleDependencies(k), partitionStartIndices)
-        newPostShuffleRDDs.put(exchange, rdd)
-
-        k += 1
-      }
-
-      // Finally, we set postShuffleRDDs and estimated.
-      assert(postShuffleRDDs.isEmpty)
-      assert(newPostShuffleRDDs.size() == numExchanges)
-      postShuffleRDDs.putAll(newPostShuffleRDDs)
-      estimated = true
-    }
-  }
-
-  def postShuffleRDD(exchange: Exchange): ShuffledRowRDD = {
-    doEstimationIfNecessary()
-
-    if (!postShuffleRDDs.containsKey(exchange)) {
-      throw new IllegalStateException(
-        s"The given $exchange is not registered in this coordinator.")
-    }
-
-    postShuffleRDDs.get(exchange)
-  }
-
-  override def toString: String = {
-    s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 477a946..3be4cce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -24,6 +24,7 @@ import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.concurrent.duration._
 
 import org.apache.spark.Logging
+import org.apache.spark.broadcast
 import org.apache.spark.rdd.{RDD, RDDOperationScope}
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -108,15 +109,30 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)
 
   /**
-   * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
-   * after adding query plan information to created RDDs for visualization.
-   * Concrete implementations of SparkPlan should override doExecute instead.
+   * Returns the result of this query as an RDD[InternalRow] by delegating to doExecute after
+   * preparations. Concrete implementations of SparkPlan should override doExecute.
    */
-  final def execute(): RDD[InternalRow] = {
+  final def execute(): RDD[InternalRow] = executeQuery {
+    doExecute()
+  }
+
+  /**
+   * Returns the result of this query as a broadcast variable by delegating to doBroadcast after
+   * preparations. Concrete implementations of SparkPlan should override doBroadcast.
+   */
+  final def executeBroadcast[T](): broadcast.Broadcast[T] = executeQuery {
+    doExecuteBroadcast()
+  }
+
+  /**
+   * Execute a query after preparing the query and adding query plan information to created RDDs
+   * for visualization.
+   */
+  private final def executeQuery[T](query: => T): T = {
     RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
       prepare()
       waitForSubqueries()
-      doExecute()
+      query
     }
   }
 
@@ -193,6 +209,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   protected def doExecute(): RDD[InternalRow]
 
   /**
+   * Overridden by concrete implementations of SparkPlan.
+   * Produces the result of the query as a broadcast variable.
+   */
+  protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    throw new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast")
+  }
+
+  /**
    * Runs this query returning the result as an array.
    */
   def executeCollect(): Array[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 382654a..7347156 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -25,6 +26,7 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand}
 import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _}
@@ -328,7 +330,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
       case logical.Repartition(numPartitions, shuffle, child) =>
         if (shuffle) {
-          execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
+          ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
         } else {
           execution.Coalesce(numPartitions, planLater(child)) :: Nil
         }
@@ -367,7 +369,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case r @ logical.Range(start, end, step, numSlices, output) =>
         execution.Range(start, step, numSlices, r.numElements, output) :: Nil
       case logical.RepartitionByExpression(expressions, child, nPartitions) =>
-        execution.Exchange(HashPartitioning(
+        exchange.ShuffleExchange(HashPartitioning(
           expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
       case e @ python.EvaluatePython(udf, child, _) =>
         python.BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 990eeb2..d79b547 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
@@ -172,6 +173,10 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
     child.execute()
   }
 
+  override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    child.doExecuteBroadcast()
+  }
+
   override def supportCodegen: Boolean = false
 
   override def upstream(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
new file mode 100644
index 0000000..40cad4b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.duration._
+
+import org.apache.spark.broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryNode}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A [[BroadcastExchange]] collects, transforms and finally broadcasts the result of a transformed
+ * SparkPlan.
+ */
+case class BroadcastExchange(
+    mode: BroadcastMode,
+    child: SparkPlan) extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
+
+  @transient
+  private val timeout: Duration = {
+    val timeoutValue = sqlContext.conf.broadcastTimeout
+    if (timeoutValue < 0) {
+      Duration.Inf
+    } else {
+      timeoutValue.seconds
+    }
+  }
+
+  @transient
+  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
+    // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
+    val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    Future {
+      // This will run in another thread. Set the execution id so that we can connect these jobs
+      // with the correct execution.
+      SQLExecution.withExecutionId(sparkContext, executionId) {
+        // Note that we use .executeCollect() because we don't want to convert data to Scala types
+        val input: Array[InternalRow] = child.executeCollect()
+
+        // Construct and broadcast the relation.
+        sparkContext.broadcast(mode.transform(input))
+      }
+    }(BroadcastExchange.executionContext)
+  }
+
+  override protected def doPrepare(): Unit = {
+    // Materialize the future.
+    relationFuture
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException(
+      "BroadcastExchange does not support the execute() code path.")
+  }
+
+  override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    val result = Await.result(relationFuture, timeout)
+    result.asInstanceOf[broadcast.Broadcast[T]]
+  }
+}
+
+object BroadcastExchange {
+  private[execution] val executionContext = ExecutionContext.fromExecutorService(
+    ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
new file mode 100644
index 0000000..709a424
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+
+/**
+ * Ensures that the [[org.apache.spark.sql.catalyst.plans.physical.Partitioning Partitioning]]
+ * of input data meets the
+ * [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
+ * each operator by inserting [[ShuffleExchange]] Operators where required.  Also ensure that the
+ * input partition ordering requirements are met.
+ */
+private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
+  private def defaultNumPreShufflePartitions: Int = sqlContext.conf.numShufflePartitions
+
+  private def targetPostShuffleInputSize: Long = sqlContext.conf.targetPostShuffleInputSize
+
+  private def adaptiveExecutionEnabled: Boolean = sqlContext.conf.adaptiveExecutionEnabled
+
+  private def minNumPostShufflePartitions: Option[Int] = {
+    val minNumPostShufflePartitions = sqlContext.conf.minNumPostShufflePartitions
+    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) else None
+  }
+
+  /**
+   * Given a required distribution, returns a partitioning that satisfies that distribution.
+   */
+  private def createPartitioning(
+      requiredDistribution: Distribution,
+      numPartitions: Int): Partitioning = {
+    requiredDistribution match {
+      case AllTuples => SinglePartition
+      case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions)
+      case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions)
+      case dist => sys.error(s"Do not know how to satisfy distribution $dist")
+    }
+  }
+
+  /**
+   * Adds [[ExchangeCoordinator]] to [[ShuffleExchange]]s if adaptive query execution is enabled
+   * and partitioning schemes of these [[ShuffleExchange]]s support [[ExchangeCoordinator]].
+   */
+  private def withExchangeCoordinator(
+      children: Seq[SparkPlan],
+      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
+    val supportsCoordinator =
+      if (children.exists(_.isInstanceOf[ShuffleExchange])) {
+        // Right now, ExchangeCoordinator only support HashPartitionings.
+        children.forall {
+          case e @ ShuffleExchange(hash: HashPartitioning, _, _) => true
+          case child =>
+            child.outputPartitioning match {
+              case hash: HashPartitioning => true
+              case collection: PartitioningCollection =>
+                collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
+              case _ => false
+            }
+        }
+      } else {
+        // In this case, although we do not have Exchange operators, we may still need to
+        // shuffle data when we have more than one children because data generated by
+        // these children may not be partitioned in the same way.
+        // Please see the comment in withCoordinator for more details.
+        val supportsDistribution =
+          requiredChildDistributions.forall(_.isInstanceOf[ClusteredDistribution])
+        children.length > 1 && supportsDistribution
+      }
+
+    val withCoordinator =
+      if (adaptiveExecutionEnabled && supportsCoordinator) {
+        val coordinator =
+          new ExchangeCoordinator(
+            children.length,
+            targetPostShuffleInputSize,
+            minNumPostShufflePartitions)
+        children.zip(requiredChildDistributions).map {
+          case (e: ShuffleExchange, _) =>
+            // This child is an Exchange, we need to add the coordinator.
+            e.copy(coordinator = Some(coordinator))
+          case (child, distribution) =>
+            // If this child is not an Exchange, we need to add an Exchange for now.
+            // Ideally, we can try to avoid this Exchange. However, when we reach here,
+            // there are at least two children operators (because if there is a single child
+            // and we can avoid Exchange, supportsCoordinator will be false and we
+            // will not reach here.). Although we can make two children have the same number of
+            // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different.
+            // For example, let's say we have the following plan
+            //         Join
+            //         /  \
+            //       Agg  Exchange
+            //       /      \
+            //    Exchange  t2
+            //      /
+            //     t1
+            // In this case, because a post-shuffle partition can include multiple pre-shuffle
+            // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes
+            // after shuffle. So, even we can use the child Exchange operator of the Join to
+            // have a number of post-shuffle partitions that matches the number of partitions of
+            // Agg, we cannot say these two children are partitioned in the same way.
+            // Here is another case
+            //         Join
+            //         /  \
+            //       Agg1  Agg2
+            //       /      \
+            //   Exchange1  Exchange2
+            //       /       \
+            //      t1       t2
+            // In this case, two Aggs shuffle data with the same column of the join condition.
+            // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same
+            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2
+            // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle
+            // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its
+            // pre-shuffle partitions by using another partitionStartIndices [0, 4].
+            // So, Agg1 and Agg2 are actually not co-partitioned.
+            //
+            // It will be great to introduce a new Partitioning to represent the post-shuffle
+            // partitions when one post-shuffle partition includes multiple pre-shuffle partitions.
+            val targetPartitioning =
+              createPartitioning(distribution, defaultNumPreShufflePartitions)
+            assert(targetPartitioning.isInstanceOf[HashPartitioning])
+            ShuffleExchange(targetPartitioning, child, Some(coordinator))
+        }
+      } else {
+        // If we do not need ExchangeCoordinator, the original children are returned.
+        children
+      }
+
+    withCoordinator
+  }
+
+  private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
+    val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
+    val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
+    var children: Seq[SparkPlan] = operator.children
+    assert(requiredChildDistributions.length == children.length)
+    assert(requiredChildOrderings.length == children.length)
+
+    // Ensure that the operator's children satisfy their output distribution requirements:
+    children = children.zip(requiredChildDistributions).map {
+      case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
+        child
+      case (child, BroadcastDistribution(mode)) =>
+        BroadcastExchange(mode, child)
+      case (child, distribution) =>
+        ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child)
+    }
+
+    // If the operator has multiple children and specifies child output distributions (e.g. join),
+    // then the children's output partitionings must be compatible:
+    def requireCompatiblePartitioning(distribution: Distribution): Boolean = distribution match {
+      case UnspecifiedDistribution => false
+      case BroadcastDistribution(_) => false
+      case _ => true
+    }
+    if (children.length > 1
+        && requiredChildDistributions.exists(requireCompatiblePartitioning)
+        && !Partitioning.allCompatible(children.map(_.outputPartitioning))) {
+
+      // First check if the existing partitions of the children all match. This means they are
+      // partitioned by the same partitioning into the same number of partitions. In that case,
+      // don't try to make them match `defaultPartitions`, just use the existing partitioning.
+      val maxChildrenNumPartitions = children.map(_.outputPartitioning.numPartitions).max
+      val useExistingPartitioning = children.zip(requiredChildDistributions).forall {
+        case (child, distribution) =>
+          child.outputPartitioning.guarantees(
+            createPartitioning(distribution, maxChildrenNumPartitions))
+      }
+
+      children = if (useExistingPartitioning) {
+        // We do not need to shuffle any child's output.
+        children
+      } else {
+        // We need to shuffle at least one child's output.
+        // Now, we will determine the number of partitions that will be used by created
+        // partitioning schemes.
+        val numPartitions = {
+          // Let's see if we need to shuffle all child's outputs when we use
+          // maxChildrenNumPartitions.
+          val shufflesAllChildren = children.zip(requiredChildDistributions).forall {
+            case (child, distribution) =>
+              !child.outputPartitioning.guarantees(
+                createPartitioning(distribution, maxChildrenNumPartitions))
+          }
+          // If we need to shuffle all children, we use defaultNumPreShufflePartitions as the
+          // number of partitions. Otherwise, we use maxChildrenNumPartitions.
+          if (shufflesAllChildren) defaultNumPreShufflePartitions else maxChildrenNumPartitions
+        }
+
+        children.zip(requiredChildDistributions).map {
+          case (child, distribution) =>
+            val targetPartitioning = createPartitioning(distribution, numPartitions)
+            if (child.outputPartitioning.guarantees(targetPartitioning)) {
+              child
+            } else {
+              child match {
+                // If child is an exchange, we replace it with
+                // a new one having targetPartitioning.
+                case ShuffleExchange(_, c, _) => ShuffleExchange(targetPartitioning, c)
+                case _ => ShuffleExchange(targetPartitioning, child)
+              }
+          }
+        }
+      }
+    }
+
+    // Now, we need to add ExchangeCoordinator if necessary.
+    // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges.
+    // However, with the way that we plan the query, we do not have a place where we have a
+    // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator
+    // at here for now.
+    // Once we finish https://issues.apache.org/jira/browse/SPARK-10665,
+    // we can first add Exchanges and then add coordinator once we have a DAG of query fragments.
+    children = withExchangeCoordinator(children, requiredChildDistributions)
+
+    // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
+    children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
+      if (requiredOrdering.nonEmpty) {
+        // If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
+        if (requiredOrdering != child.outputOrdering.take(requiredOrdering.length)) {
+          Sort(requiredOrdering, global = false, child = child)
+        } else {
+          child
+        }
+      } else {
+        child
+      }
+    }
+
+    operator.withNewChildren(children)
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
+    case operator @ ShuffleExchange(partitioning, child, _) =>
+      child.children match {
+        case ShuffleExchange(childPartitioning, baseChild, _)::Nil =>
+          if (childPartitioning.guarantees(partitioning)) child else operator
+        case _ => operator
+      }
+    case operator: SparkPlan => ensureDistributionAndOrdering(operator)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6a873d6/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
new file mode 100644
index 0000000..6f3bb0a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.exchange
+
+import java.util.{HashMap => JHashMap, Map => JMap}
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Logging, MapOutputStatistics, ShuffleDependency, SimpleFutureAction}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan}
+
+/**
+ * A coordinator used to determines how we shuffle data between stages generated by Spark SQL.
+ * Right now, the work of this coordinator is to determine the number of post-shuffle partitions
+ * for a stage that needs to fetch shuffle data from one or multiple stages.
+ *
+ * A coordinator is constructed with three parameters, `numExchanges`,
+ * `targetPostShuffleInputSize`, and `minNumPostShufflePartitions`.
+ *  - `numExchanges` is used to indicated that how many [[ShuffleExchange]]s that will be registered
+ *    to this coordinator. So, when we start to do any actual work, we have a way to make sure that
+ *    we have got expected number of [[ShuffleExchange]]s.
+ *  - `targetPostShuffleInputSize` is the targeted size of a post-shuffle partition's
+ *    input data size. With this parameter, we can estimate the number of post-shuffle partitions.
+ *    This parameter is configured through
+ *    `spark.sql.adaptive.shuffle.targetPostShuffleInputSize`.
+ *  - `minNumPostShufflePartitions` is an optional parameter. If it is defined, this coordinator
+ *    will try to make sure that there are at least `minNumPostShufflePartitions` post-shuffle
+ *    partitions.
+ *
+ * The workflow of this coordinator is described as follows:
+ *  - Before the execution of a [[SparkPlan]], for an [[ShuffleExchange]] operator,
+ *    if an [[ExchangeCoordinator]] is assigned to it, it registers itself to this coordinator.
+ *    This happens in the `doPrepare` method.
+ *  - Once we start to execute a physical plan, an [[ShuffleExchange]] registered to this
+ *    coordinator will call `postShuffleRDD` to get its corresponding post-shuffle
+ *    [[ShuffledRowRDD]].
+ *    If this coordinator has made the decision on how to shuffle data, this [[ShuffleExchange]]
+ *    will immediately get its corresponding post-shuffle [[ShuffledRowRDD]].
+ *  - If this coordinator has not made the decision on how to shuffle data, it will ask those
+ *    registered [[ShuffleExchange]]s to submit their pre-shuffle stages. Then, based on the the
+ *    size statistics of pre-shuffle partitions, this coordinator will determine the number of
+ *    post-shuffle partitions and pack multiple pre-shuffle partitions with continuous indices
+ *    to a single post-shuffle partition whenever necessary.
+ *  - Finally, this coordinator will create post-shuffle [[ShuffledRowRDD]]s for all registered
+ *    [[ShuffleExchange]]s. So, when an [[ShuffleExchange]] calls `postShuffleRDD`, this coordinator
+ *    can lookup the corresponding [[RDD]].
+ *
+ * The strategy used to determine the number of post-shuffle partitions is described as follows.
+ * To determine the number of post-shuffle partitions, we have a target input size for a
+ * post-shuffle partition. Once we have size statistics of pre-shuffle partitions from stages
+ * corresponding to the registered [[ShuffleExchange]]s, we will do a pass of those statistics and
+ * pack pre-shuffle partitions with continuous indices to a single post-shuffle partition until
+ * the size of a post-shuffle partition is equal or greater than the target size.
+ * For example, we have two stages with the following pre-shuffle partition size statistics:
+ * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB]
+ * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
+ * assuming the target input size is 128 MB, we will have three post-shuffle partitions,
+ * which are:
+ *  - post-shuffle partition 0: pre-shuffle partition 0 and 1
+ *  - post-shuffle partition 1: pre-shuffle partition 2
+ *  - post-shuffle partition 2: pre-shuffle partition 3 and 4
+ */
+private[sql] class ExchangeCoordinator(
+    numExchanges: Int,
+    advisoryTargetPostShuffleInputSize: Long,
+    minNumPostShufflePartitions: Option[Int] = None)
+  extends Logging {
+
+  // The registered Exchange operators.
+  private[this] val exchanges = ArrayBuffer[ShuffleExchange]()
+
+  // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator.
+  private[this] val postShuffleRDDs: JMap[ShuffleExchange, ShuffledRowRDD] =
+    new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
+
+  // A boolean that indicates if this coordinator has made decision on how to shuffle data.
+  // This variable will only be updated by doEstimationIfNecessary, which is protected by
+  // synchronized.
+  @volatile private[this] var estimated: Boolean = false
+
+  /**
+   * Registers an [[ShuffleExchange]] operator to this coordinator. This method is only allowed to
+   * be called in the `doPrepare` method of an [[ShuffleExchange]] operator.
+   */
+  @GuardedBy("this")
+  def registerExchange(exchange: ShuffleExchange): Unit = synchronized {
+    exchanges += exchange
+  }
+
+  def isEstimated: Boolean = estimated
+
+  /**
+   * Estimates partition start indices for post-shuffle partitions based on
+   * mapOutputStatistics provided by all pre-shuffle stages.
+   */
+  private[sql] def estimatePartitionStartIndices(
+      mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = {
+    // If we have mapOutputStatistics.length < numExchange, it is because we do not submit
+    // a stage when the number of partitions of this dependency is 0.
+    assert(mapOutputStatistics.length <= numExchanges)
+
+    // If minNumPostShufflePartitions is defined, it is possible that we need to use a
+    // value less than advisoryTargetPostShuffleInputSize as the target input size of
+    // a post shuffle task.
+    val targetPostShuffleInputSize = minNumPostShufflePartitions match {
+      case Some(numPartitions) =>
+        val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum
+        // The max at here is to make sure that when we have an empty table, we
+        // only have a single post-shuffle partition.
+        // There is no particular reason that we pick 16. We just need a number to
+        // prevent maxPostShuffleInputSize from being set to 0.
+        val maxPostShuffleInputSize =
+          math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16)
+        math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
+
+      case None => advisoryTargetPostShuffleInputSize
+    }
+
+    logInfo(
+      s"advisoryTargetPostShuffleInputSize: $advisoryTargetPostShuffleInputSize, " +
+      s"targetPostShuffleInputSize $targetPostShuffleInputSize.")
+
+    // Make sure we do get the same number of pre-shuffle partitions for those stages.
+    val distinctNumPreShufflePartitions =
+      mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct
+    // The reason that we are expecting a single value of the number of pre-shuffle partitions
+    // is that when we add Exchanges, we set the number of pre-shuffle partitions
+    // (i.e. map output partitions) using a static setting, which is the value of
+    // spark.sql.shuffle.partitions. Even if two input RDDs are having different
+    // number of partitions, they will have the same number of pre-shuffle partitions
+    // (i.e. map output partitions).
+    assert(
+      distinctNumPreShufflePartitions.length == 1,
+      "There should be only one distinct value of the number pre-shuffle partitions " +
+        "among registered Exchange operator.")
+    val numPreShufflePartitions = distinctNumPreShufflePartitions.head
+
+    val partitionStartIndices = ArrayBuffer[Int]()
+    // The first element of partitionStartIndices is always 0.
+    partitionStartIndices += 0
+
+    var postShuffleInputSize = 0L
+
+    var i = 0
+    while (i < numPreShufflePartitions) {
+      // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages.
+      // Then, we add the total size to postShuffleInputSize.
+      var j = 0
+      while (j < mapOutputStatistics.length) {
+        postShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
+        j += 1
+      }
+
+      // If the current postShuffleInputSize is equal or greater than the
+      // targetPostShuffleInputSize, We need to add a new element in partitionStartIndices.
+      if (postShuffleInputSize >= targetPostShuffleInputSize) {
+        if (i < numPreShufflePartitions - 1) {
+          // Next start index.
+          partitionStartIndices += i + 1
+        } else {
+          // This is the last element. So, we do not need to append the next start index to
+          // partitionStartIndices.
+        }
+        // reset postShuffleInputSize.
+        postShuffleInputSize = 0L
+      }
+
+      i += 1
+    }
+
+    partitionStartIndices.toArray
+  }
+
+  @GuardedBy("this")
+  private def doEstimationIfNecessary(): Unit = synchronized {
+    // It is unlikely that this method will be called from multiple threads
+    // (when multiple threads trigger the execution of THIS physical)
+    // because in common use cases, we will create new physical plan after
+    // users apply operations (e.g. projection) to an existing DataFrame.
+    // However, if it happens, we have synchronized to make sure only one
+    // thread will trigger the job submission.
+    if (!estimated) {
+      // Make sure we have the expected number of registered Exchange operators.
+      assert(exchanges.length == numExchanges)
+
+      val newPostShuffleRDDs = new JHashMap[ShuffleExchange, ShuffledRowRDD](numExchanges)
+
+      // Submit all map stages
+      val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]()
+      val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]()
+      var i = 0
+      while (i < numExchanges) {
+        val exchange = exchanges(i)
+        val shuffleDependency = exchange.prepareShuffleDependency()
+        shuffleDependencies += shuffleDependency
+        if (shuffleDependency.rdd.partitions.length != 0) {
+          // submitMapStage does not accept RDD with 0 partition.
+          // So, we will not submit this dependency.
+          submittedStageFutures +=
+            exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency)
+        }
+        i += 1
+      }
+
+      // Wait for the finishes of those submitted map stages.
+      val mapOutputStatistics = new Array[MapOutputStatistics](submittedStageFutures.length)
+      var j = 0
+      while (j < submittedStageFutures.length) {
+        // This call is a blocking call. If the stage has not finished, we will wait at here.
+        mapOutputStatistics(j) = submittedStageFutures(j).get()
+        j += 1
+      }
+
+      // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the
+      // number of post-shuffle partitions.
+      val partitionStartIndices =
+        if (mapOutputStatistics.length == 0) {
+          None
+        } else {
+          Some(estimatePartitionStartIndices(mapOutputStatistics))
+        }
+
+      var k = 0
+      while (k < numExchanges) {
+        val exchange = exchanges(k)
+        val rdd =
+          exchange.preparePostShuffleRDD(shuffleDependencies(k), partitionStartIndices)
+        newPostShuffleRDDs.put(exchange, rdd)
+
+        k += 1
+      }
+
+      // Finally, we set postShuffleRDDs and estimated.
+      assert(postShuffleRDDs.isEmpty)
+      assert(newPostShuffleRDDs.size() == numExchanges)
+      postShuffleRDDs.putAll(newPostShuffleRDDs)
+      estimated = true
+    }
+  }
+
+  def postShuffleRDD(exchange: ShuffleExchange): ShuffledRowRDD = {
+    doEstimationIfNecessary()
+
+    if (!postShuffleRDDs.containsKey(exchange)) {
+      throw new IllegalStateException(
+        s"The given $exchange is not registered in this coordinator.")
+    }
+
+    postShuffleRDDs.get(exchange)
+  }
+
+  override def toString: String = {
+    s"coordinator[target post-shuffle partition size: $advisoryTargetPostShuffleInputSize]"
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org