You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/04 11:44:04 UTC

git commit: [SPARK-2234][SQL]Spark SQL basicOperators add Except operator

Repository: spark
Updated Branches:
  refs/heads/master b3e768e15 -> 5dadda864


[SPARK-2234][SQL]Spark SQL basicOperators  add Except operator

Hi all,
I want to submit a Except operator in basicOperators.scala
In SQL case.SQL support two table do except operator.
select * from table1
except
select * from table2
This operator support the substract function .Return an table with the elements from `this` that are not in `other`.This operator should limit the input SparkPlan Seq only has two member.The check will later support
JIRA:https://issues.apache.org/jira/browse/SPARK-2234

Author: Yanjie Gao <ga...@163.com>
Author: YanjieGao <39...@qq.com>
Author: root <root@node4.(none)>
Author: gaoyanjie <ga...@163.com>

Closes #1151 from YanjieGao/patch-6 and squashes the following commits:

f19f899 [YanjieGao] add a new blank line in basicoperators.scala
2ff7d73 [YanjieGao] resolve the identation in SqlParser and SparkStrategies
fdb5227 [YanjieGao] Merge remote branch 'upstream/master' into patch-6
9940d19 [YanjieGao] make comment less than 100c
09c7413 [YanjieGao] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment
b4b5867 [root] Merge remote branch 'upstream/master' into patch-6
b4c3869 [Yanjie Gao] change SparkStrategies Sparkcontext to SqlContext
7e0ec29 [Yanjie Gao] delete multi test
7e7c83f [Yanjie Gao] delete conflict except
b01beb8 [YanjieGao] resolve conflict sparkstrategies and basicOperators
4dc8166 [YanjieGao] resolve conflict
fa68a98 [Yanjie Gao] Update joins.scala
8e6bb00 [Yanjie Gao] delete conflict except
dd9ba5e [Yanjie Gao] Update joins.scala
a0d4e73 [Yanjie Gao] delete skew join
60f5ddd [Yanjie Gao] update less than 100c
0e72233 [Yanjie Gao] update SQLQuerySuite on master branch
7f916b5 [Yanjie Gao] update execution/basicOperators on master branch
a28dece [Yanjie Gao] Update logical/basicOperators on master branch
a639935 [Yanjie Gao] Update SparkStrategies.scala
3bf7def [Yanjie Gao] update SqlParser on master branch
26f833f [Yanjie Gao] update SparkStrategies.scala on master branch
8dd063f [Yanjie Gao] Update logical/basicOperators on master branch
9847dcf [Yanjie Gao] update SqlParser on masterbranch
d6a4604 [Yanjie Gao] Update joins.scala
424c507 [Yanjie Gao] Update joins.scala
7680742 [Yanjie Gao] Update SqlParser.scala
a7193d8 [gaoyanjie] [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151
5c8a224 [Yanjie Gao] update the line less than 100c
ee066b3 [Yanjie Gao] Update basicOperators.scala
32a80ab [Yanjie Gao] remove except in HiveQl
cf232eb [Yanjie Gao] update 1comment 2space3 left.out
f1ea3f3 [Yanjie Gao] remove comment
7ea9b91 [Yanjie Gao] remove annotation
7f3d613 [Yanjie Gao] update .map(_.copy())
670a1bb [Yanjie Gao] Update HiveQl.scala
3fe7746 [Yanjie Gao] Update SQLQuerySuite.scala
a36eb0a [Yanjie Gao] Update basicOperators.scala
7859e56 [Yanjie Gao] Update SparkStrategies.scala
052346d [Yanjie Gao] Subtract is conflict with Subtract(e1,e2)
aab3785 [Yanjie Gao] Update SQLQuerySuite.scala
4bf80b1 [Yanjie Gao] update subtract to except
4bdd520 [Yanjie Gao] Update SqlParser.scala
2d4bfbd [Yanjie Gao] Update SQLQuerySuite.scala
0808921 [Yanjie Gao] SQLQuerySuite
a8a1948 [Yanjie Gao] SparkStrategies
1fe96c0 [Yanjie Gao] HiveQl.scala update
3305e40 [Yanjie Gao] SqlParser
7a98c37 [Yanjie Gao] Update basicOperators.scala
cf5b9d0 [Yanjie Gao] Update basicOperators.scala
8945835 [Yanjie Gao] object SkewJoin extends Strategy
2b98962 [Yanjie Gao] Update SqlParser.scala
dd32980 [Yanjie Gao] update1
68815b2 [Yanjie Gao] Reformat the code style
4eb43ec [Yanjie Gao] Update basicOperators.scala
aa06072 [Yanjie Gao] Reformat the code sytle


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5dadda86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5dadda86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5dadda86

Branch: refs/heads/master
Commit: 5dadda86456e1d3918e320b83aec7e2f1352d95d
Parents: b3e768e
Author: Yanjie Gao <ga...@163.com>
Authored: Fri Jul 4 02:43:57 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Jul 4 02:43:57 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/catalyst/SqlParser.scala    |  3 +++
 .../sql/catalyst/plans/logical/basicOperators.scala  |  6 ++++++
 .../apache/spark/sql/execution/SparkStrategies.scala |  2 ++
 .../apache/spark/sql/execution/basicOperators.scala  | 15 +++++++++++++++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala   | 14 ++++++++++++++
 5 files changed, 40 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 61762fa..ecb1112 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -118,6 +118,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected val UNCACHE = Keyword("UNCACHE")
   protected val UNION = Keyword("UNION")
   protected val WHERE = Keyword("WHERE")
+  protected val EXCEPT = Keyword("EXCEPT")
+
 
   // Use reflection to find the reserved words defined in this class.
   protected val reservedWords =
@@ -138,6 +140,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected lazy val query: Parser[LogicalPlan] = (
     select * (
         UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
+        EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
         UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
       )
     | insert | cache

http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 3e06398..bac5a72 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -89,6 +89,12 @@ case class Join(
   }
 }
 
+case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
+  def output = left.output
+
+  def references = Set.empty
+}
+
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],

http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0925605..9e036e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -273,6 +273,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.Limit(limit, planLater(child))(sqlContext) :: Nil
       case Unions(unionChildren) =>
         execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil
+      case logical.Except(left,right) =>                                        
+        execution.Except(planLater(left),planLater(right)) :: Nil   
       case logical.Generate(generator, join, outer, _, child) =>
         execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil
       case logical.NoRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a278f1c..4b59e0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -205,3 +205,18 @@ object ExistingRdd {
 case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
   override def execute() = rdd
 }
+
+/**
+ * :: DeveloperApi ::
+ * Returns a table with the elements from left that are not in right using
+ * the built-in spark subtract function.
+ */
+@DeveloperApi
+case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
+  override def output = left.output
+
+  override def execute() = {
+    left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/5dadda86/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2c1cb18..5c6701e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -371,6 +371,20 @@ class SQLQuerySuite extends QueryTest {
         (3, null)))
   }
 
+  test("EXCEPT") {
+
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "),
+      (1, "a") ::
+      (2, "b") ::
+      (3, "c") ::
+      (4, "d") :: Nil)
+    checkAnswer(
+      sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil)
+    checkAnswer(
+      sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil)
+  }
+
   test("SET commands semantics using sql()") {
     TestSQLContext.settings.synchronized {
       clear()