You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/10/08 00:53:45 UTC

spark git commit: [SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator

Repository: spark
Updated Branches:
  refs/heads/master 37526aca2 -> 7e2e26828


[SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator

This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.

Author: Josh Rosen <jo...@databricks.com>
Author: Liang-Chi Hsieh <vi...@appier.com>

Closes #8083 from JoshRosen/SPARK-9702.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e2e2682
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e2e2682
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e2e2682

Branch: refs/heads/master
Commit: 7e2e268289828ae664622c59b90d82938d957ff3
Parents: 37526ac
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Oct 7 15:53:37 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Oct 7 15:53:37 2015 -0700

----------------------------------------------------------------------
 .../catalyst/plans/physical/partitioning.scala    | 16 ++++++++++++++++
 .../org/apache/spark/sql/execution/Exchange.scala | 16 +++++++++++++---
 .../spark/sql/execution/SparkStrategies.scala     |  6 +++++-
 .../spark/sql/execution/basicOperators.scala      | 18 +++++++++---------
 4 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e2e2682/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 5ac3f1f..86b9417 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -194,6 +194,22 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
   override def guarantees(other: Partitioning): Boolean = false
 }
 
+/**
+ * Represents a partitioning where rows are distributed evenly across output partitions
+ * by starting from a random target partition number and distributing rows in a round-robin
+ * fashion. This partitioning is used when implementing the DataFrame.repartition() operator.
+ */
+case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
+  override def satisfies(required: Distribution): Boolean = required match {
+    case UnspecifiedDistribution => true
+    case _ => false
+  }
+
+  override def compatibleWith(other: Partitioning): Boolean = false
+
+  override def guarantees(other: Partitioning): Boolean = false
+}
+
 case object SinglePartition extends Partitioning {
   val numPartitions = 1
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7e2e2682/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 029f226..8efa471 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.Random
+
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
@@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
-import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv}
+import org.apache.spark._
 
 /**
  * :: DeveloperApi ::
@@ -130,7 +132,6 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
   @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf
 
   private val serializer: Serializer = {
-    val rowDataTypes = child.output.map(_.dataType).toArray
     if (tungstenMode) {
       new UnsafeRowSerializer(child.output.size)
     } else {
@@ -141,6 +142,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
   protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") {
     val rdd = child.execute()
     val part: Partitioner = newPartitioning match {
+      case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions)
       case HashPartitioning(expressions, numPartitions) => new HashPartitioner(numPartitions)
       case RangePartitioning(sortingExpressions, numPartitions) =>
         // Internally, RangePartitioner runs a job on the RDD that samples keys to compute
@@ -162,7 +164,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")
       // TODO: Handle BroadcastPartitioning.
     }
-    def getPartitionKeyExtractor(): InternalRow => InternalRow = newPartitioning match {
+    def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
+      case RoundRobinPartitioning(numPartitions) =>
+        // Distributes elements evenly across output partitions, starting from a random partition.
+        var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions)
+        (row: InternalRow) => {
+          // The HashPartitioner will handle the `mod` by the number of partitions
+          position += 1
+          position
+        }
       case HashPartitioning(expressions, _) => newMutableProjection(expressions, child.output)()
       case RangePartitioning(_, _) | SinglePartition => identity
       case _ => sys.error(s"Exchange not implemented for $newPartitioning")

http://git-wip-us.apache.org/repos/asf/spark/blob/7e2e2682/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index b078c8b..d1bbf2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -336,7 +336,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         throw new IllegalStateException(
           "logical distinct operator should have been replaced by aggregate in the optimizer")
       case logical.Repartition(numPartitions, shuffle, child) =>
-        execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil
+        if (shuffle) {
+          execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
+        } else {
+          execution.Coalesce(numPartitions, planLater(child)) :: Nil
+        }
       case logical.SortPartitions(sortExprs, child) =>
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.

http://git-wip-us.apache.org/repos/asf/spark/blob/7e2e2682/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 3e49e0a..d4bbbeb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -17,21 +17,20 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.Random
+
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD}
+import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.collection.ExternalSorter
-import org.apache.spark.util.collection.unsafe.sort.PrefixComparator
 import org.apache.spark.util.random.PoissonSampler
-import org.apache.spark.util.{CompletionIterator, MutablePair}
+import org.apache.spark.util.MutablePair
 import org.apache.spark.{HashPartitioner, SparkEnv}
 
 /**
@@ -279,10 +278,12 @@ case class TakeOrderedAndProject(
 /**
  * :: DeveloperApi ::
  * Return a new RDD that has exactly `numPartitions` partitions.
+ * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g.
+ * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
+ * the 100 new partitions will claim 10 of the current partitions.
  */
 @DeveloperApi
-case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
-  extends UnaryNode {
+case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = {
@@ -291,11 +292,10 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
+    child.execute().map(_.copy()).coalesce(numPartitions, shuffle = false)
   }
 }
 
-
 /**
  * :: DeveloperApi ::
  * Returns a table with the elements from left that are not in right using


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org