You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/08 04:40:11 UTC

git commit: [SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator

Repository: spark
Updated Branches:
  refs/heads/master 4352a2fda -> 50561f439


[SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator

Hi all,
I want to submit a basic operator Intersect
For example , in sql case
select * from table1
intersect
select * from table2
So ,i want use this operator support this function in Spark SQL
This operator will return the  the intersection of SparkPlan child table RDD .
JIRA:https://issues.apache.org/jira/browse/SPARK-2235

Author: Yanjie Gao <ga...@163.com>
Author: YanjieGao <39...@qq.com>

Closes #1150 from YanjieGao/patch-5 and squashes the following commits:

4629afe [YanjieGao] reformat the code
bdc2ac0 [YanjieGao] reformat the code as Michael's suggestion
3b29ad6 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
1cfbfe6 [YanjieGao] refomat some files
ea78f33 [YanjieGao] resolve conflict and add annotation on basicOperator and remove HiveQl
0c7cca5 [YanjieGao] modify format problem
a802ca8 [YanjieGao] Merge remote branch 'upstream/master' into patch-5
5e374c7 [YanjieGao] resolve conflict in SparkStrategies and basicOperator
f7961f6 [Yanjie Gao] update the line less than
bdc4a05 [Yanjie Gao] Update basicOperators.scala
0b49837 [Yanjie Gao] delete the annotation
f1288b4 [Yanjie Gao] delete annotation
e2b64be [Yanjie Gao] Update basicOperators.scala
4dd453e [Yanjie Gao] Update SQLQuerySuite.scala
790765d [Yanjie Gao] Update SparkStrategies.scala
ac73e60 [Yanjie Gao] Update basicOperators.scala
d4ac5e5 [Yanjie Gao] Update HiveQl.scala
61e88e7 [Yanjie Gao] Update SqlParser.scala
469f099 [Yanjie Gao] Update basicOperators.scala
e5bff61 [Yanjie Gao] Spark SQL basicOperator add Intersect operator


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50561f43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50561f43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50561f43

Branch: refs/heads/master
Commit: 50561f4396be7d641feb2a7a54a374d294628231
Parents: 4352a2f
Author: Yanjie Gao <ga...@163.com>
Authored: Mon Jul 7 19:40:04 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jul 7 19:40:04 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/SqlParser.scala      |  2 ++
 .../sql/catalyst/plans/logical/basicOperators.scala    |  5 +++++
 .../apache/spark/sql/execution/SparkStrategies.scala   |  2 ++
 .../apache/spark/sql/execution/basicOperators.scala    | 13 +++++++++++++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala     | 11 +++++++++++
 5 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ecb1112..e5653c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected val UNCACHE = Keyword("UNCACHE")
   protected val UNION = Keyword("UNION")
   protected val WHERE = Keyword("WHERE")
+  protected val INTERSECT = Keyword("INTERSECT")
   protected val EXCEPT = Keyword("EXCEPT")
 
 
@@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected lazy val query: Parser[LogicalPlan] = (
     select * (
         UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+        INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } |
         EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
         UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 0728fa7..1537de2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode {
 case object NoRelation extends LeafNode {
   override def output = Nil
 }
+
+case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+  override def output = left.output
+  override def references = Set.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 9e036e1..7080074 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
       case logical.Except(left,right) =>                                        
         execution.Except(planLater(left),planLater(right)) :: Nil   
+      case logical.Intersect(left, right) =>
+        execution.Intersect(planLater(left), planLater(right)) :: Nil
       case logical.Generate(generator, join, outer, _, child) =>
         execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
       case logical.NoRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4b59e0b..e8816f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
   }
 }
 
+/**
+ * :: DeveloperApi ::
+ * Returns the rows in left that also appear in right using the built in spark
+ * intersection function.
+ */
+@DeveloperApi
+case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+  override def output = children.head.output
+
+  override def execute() = {
+    left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 5c6701e..fa1f32f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest {
       sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
   }
 
+ test("INTERSECT") {
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"),
+      (1, "a") ::
+      (2, "b") ::
+      (3, "c") ::
+      (4, "d") :: Nil)
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil)
+  }
+
   test("SET commands semantics using sql()") {
     TestSQLContext.settings.synchronized {
       clear()