You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/12 21:08:04 UTC

git commit: [SPARK-2441][SQL] Add more efficient distinct operator.

Repository: spark
Updated Branches:
  refs/heads/master 7a0135293 -> 7e26b5761


[SPARK-2441][SQL] Add more efficient distinct operator.

Author: Michael Armbrust <mi...@databricks.com>

Closes #1366 from marmbrus/partialDistinct and squashes the following commits:

12a31ab [Michael Armbrust] Add more efficient distinct operator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e26b576
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e26b576
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e26b576

Branch: refs/heads/master
Commit: 7e26b57615f6c1d3f9058f9c19c05ec91f017f4c
Parents: 7a01352
Author: Michael Armbrust <mi...@databricks.com>
Authored: Sat Jul 12 12:07:27 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jul 12 12:07:27 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +--
 .../spark/sql/execution/basicOperators.scala    | 33 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7080074..c078e71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.Distinct(child) =>
-        execution.Aggregate(
-          partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
+        execution.Distinct(partial = false,
+          execution.Distinct(partial = true, planLater(child))) :: Nil
       case logical.Sort(sortExprs, child) =>
         // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
         execution.Sort(sortExprs, global = true, planLater(child)):: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 97abd63..966d8f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
 import org.apache.spark.util.MutablePair
 
 /**
@@ -248,6 +248,37 @@ object ExistingRdd {
 case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
   override def execute() = rdd
 }
+/**
+ * :: DeveloperApi ::
+ * Computes the set of distinct input rows using a HashSet.
+ * @param partial when true the distinct operation is performed partially, per partition, without
+ *                shuffling the data.
+ * @param child the input query plan.
+ */
+@DeveloperApi
+case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
+  override def output = child.output
+
+  override def requiredChildDistribution =
+    if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
+
+  override def execute() = {
+    child.execute().mapPartitions { iter =>
+      val hashSet = new scala.collection.mutable.HashSet[Row]()
+
+      var currentRow: Row = null
+      while (iter.hasNext) {
+        currentRow = iter.next()
+        if (!hashSet.contains(currentRow)) {
+          hashSet.add(currentRow.copy())
+        }
+      }
+
+      hashSet.iterator
+    }
+  }
+}
+
 
 /**
  * :: DeveloperApi ::