You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/12 21:08:04 UTC
git commit: [SPARK-2441][SQL] Add more efficient distinct operator.
Repository: spark
Updated Branches:
refs/heads/master 7a0135293 -> 7e26b5761
[SPARK-2441][SQL] Add more efficient distinct operator.
Author: Michael Armbrust <mi...@databricks.com>
Closes #1366 from marmbrus/partialDistinct and squashes the following commits:
12a31ab [Michael Armbrust] Add more efficient distinct operator.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e26b576
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e26b576
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e26b576
Branch: refs/heads/master
Commit: 7e26b57615f6c1d3f9058f9c19c05ec91f017f4c
Parents: 7a01352
Author: Michael Armbrust <mi...@databricks.com>
Authored: Sat Jul 12 12:07:27 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Jul 12 12:07:27 2014 -0700
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 4 +--
.../spark/sql/execution/basicOperators.scala | 33 +++++++++++++++++++-
2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7080074..c078e71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -247,8 +247,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.Distinct(child) =>
- execution.Aggregate(
- partial = false, child.output, child.output, planLater(child))(sqlContext) :: Nil
+ execution.Distinct(partial = false,
+ execution.Distinct(partial = true, planLater(child))) :: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil
http://git-wip-us.apache.org/repos/asf/spark/blob/7e26b576/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 97abd63..966d8f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical.{OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
/**
@@ -248,6 +248,37 @@ object ExistingRdd {
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute() = rdd
}
+/**
+ * :: DeveloperApi ::
+ * Computes the set of distinct input rows using a HashSet.
+ * @param partial when true the distinct operation is performed partially, per partition, without
+ * shuffling the data.
+ * @param child the input query plan.
+ */
+@DeveloperApi
+case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
+ override def output = child.output
+
+ override def requiredChildDistribution =
+ if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
+
+ override def execute() = {
+ child.execute().mapPartitions { iter =>
+ val hashSet = new scala.collection.mutable.HashSet[Row]()
+
+ var currentRow: Row = null
+ while (iter.hasNext) {
+ currentRow = iter.next()
+ if (!hashSet.contains(currentRow)) {
+ hashSet.add(currentRow.copy())
+ }
+ }
+
+ hashSet.iterator
+ }
+ }
+}
+
/**
* :: DeveloperApi ::