You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/10 03:23:49 UTC

git commit: [SPARK-3176] Implement 'ABS and 'LAST' for sql

Repository: spark
Updated Branches:
  refs/heads/master 02b5ac719 -> 07ee4a28c


[SPARK-3176] Implement 'ABS and 'LAST' for sql

Add support for the mathematical function"ABS" and the analytic function "last" to return a subset of the rows satisfying a query within spark sql. Test-cases included.

Author: xinyunh <xi...@huawei.com>
Author: bomeng <golf8lover>

Closes #2099 from xinyunh/sqlTest and squashes the following commits:

71d15e7 [xinyunh] remove POWER part
8843643 [xinyunh] fix the code style issue
39f0309 [bomeng] Modify the code of POWER and ABS. Move them to the file arithmetic
ff8e51e [bomeng] add abs() function support
7f6980a [xinyunh] fix the bug in 'Last' component
b3df91b [xinyunh] add 'Last' component


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07ee4a28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07ee4a28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07ee4a28

Branch: refs/heads/master
Commit: 07ee4a28c3a502121770f301316cb2256e8f0ce2
Parents: 02b5ac7
Author: xinyunh <xi...@huawei.com>
Authored: Tue Sep 9 16:55:39 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Sep 9 16:55:39 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |  4 +++
 .../apache/spark/sql/catalyst/dsl/package.scala |  1 +
 .../sql/catalyst/expressions/aggregates.scala   | 28 ++++++++++++++++++++
 .../sql/catalyst/expressions/arithmetic.scala   | 15 +++++++++++
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 23 ++++++++++++++--
 5 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07ee4a28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
old mode 100644
new mode 100755
index bfc197c..a04b4a9
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -82,6 +82,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected val DISTINCT = Keyword("DISTINCT")
   protected val FALSE = Keyword("FALSE")
   protected val FIRST = Keyword("FIRST")
+  protected val LAST = Keyword("LAST")
   protected val FROM = Keyword("FROM")
   protected val FULL = Keyword("FULL")
   protected val GROUP = Keyword("GROUP")
@@ -125,6 +126,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
   protected val SUBSTR = Keyword("SUBSTR")
   protected val SUBSTRING = Keyword("SUBSTRING")
   protected val SQRT = Keyword("SQRT")
+  protected val ABS = Keyword("ABS")
 
   // Use reflection to find the reserved words defined in this class.
   protected val reservedWords =
@@ -315,6 +317,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
       case s ~ _ ~ _ ~ _ ~ _ ~ e => ApproxCountDistinct(e, s.toDouble)
     } |
     FIRST ~> "(" ~> expression <~ ")" ^^ { case exp => First(exp) } |
+    LAST ~> "(" ~> expression <~ ")" ^^ { case exp => Last(exp) } |
     AVG ~> "(" ~> expression <~ ")" ^^ { case exp => Average(exp) } |
     MIN ~> "(" ~> expression <~ ")" ^^ { case exp => Min(exp) } |
     MAX ~> "(" ~> expression <~ ")" ^^ { case exp => Max(exp) } |
@@ -330,6 +333,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
       case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l)
     } |
     SQRT ~> "(" ~> expression <~ ")" ^^ { case exp => Sqrt(exp) } |
+    ABS ~> "(" ~> expression <~ ")" ^^ { case exp => Abs(exp) } |
     ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {
       case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/07ee4a28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
old mode 100644
new mode 100755
index f44521d..deb622c
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -132,6 +132,7 @@ package object dsl {
     def approxCountDistinct(e: Expression, rsd: Double = 0.05) = ApproxCountDistinct(e, rsd)
     def avg(e: Expression) = Average(e)
     def first(e: Expression) = First(e)
+    def last(e: Expression) = Last(e)
     def min(e: Expression) = Min(e)
     def max(e: Expression) = Max(e)
     def upper(e: Expression) = Upper(e)

http://git-wip-us.apache.org/repos/asf/spark/blob/07ee4a28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
old mode 100644
new mode 100755
index 15560a2..1b4d892
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -344,6 +344,21 @@ case class First(child: Expression) extends PartialAggregate with trees.UnaryNod
   override def newInstance() = new FirstFunction(child, this)
 }
 
+case class Last(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] {
+  override def references = child.references
+  override def nullable = true
+  override def dataType = child.dataType
+  override def toString = s"LAST($child)"
+
+  override def asPartial: SplitEvaluation = {
+    val partialLast = Alias(Last(child), "PartialLast")()
+    SplitEvaluation(
+      Last(partialLast.toAttribute),
+      partialLast :: Nil)
+  }
+  override def newInstance() = new LastFunction(child, this)
+}
+
 case class AverageFunction(expr: Expression, base: AggregateExpression)
   extends AggregateFunction {
 
@@ -489,3 +504,16 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag
 
   override def eval(input: Row): Any = result
 }
+
+case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction {
+  def this() = this(null, null) // Required for serialization.
+
+  var result: Any = null
+
+  override def update(input: Row): Unit = {
+    result = input
+  }
+
+  override def eval(input: Row): Any =  if (result != null) expr.eval(result.asInstanceOf[Row])
+                                        else null
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/07ee4a28/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index f988fb0..fe825fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.analysis.UnresolvedException
 import org.apache.spark.sql.catalyst.types._
+import scala.math.pow
 
 case class UnaryMinus(child: Expression) extends UnaryExpression {
   type EvaluatedType = Any
@@ -129,3 +130,17 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
 
   override def toString = s"MaxOf($left, $right)"
 }
+
+/**
+ * A function that get the absolute value of the numeric value.
+ */
+case class Abs(child: Expression) extends UnaryExpression  {
+  type EvaluatedType = Any
+
+  def dataType = child.dataType
+  override def foldable = child.foldable
+  def nullable = child.nullable
+  override def toString = s"Abs($child)"
+
+  override def eval(input: Row): Any = n1(child, input, _.abs(_))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/07ee4a28/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 739c12f..514ac54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -41,6 +41,25 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
 
+  test("SPARK-3176 Added Parser of SQL ABS()") {
+    checkAnswer(
+      sql("SELECT ABS(-1.3)"),
+      1.3)
+    checkAnswer(
+      sql("SELECT ABS(0.0)"),
+      0.0)
+    checkAnswer(
+      sql("SELECT ABS(2.5)"),
+      2.5)
+  }
+
+  test("SPARK-3176 Added Parser of SQL LAST()") {
+    checkAnswer(
+      sql("SELECT LAST(n) FROM lowerCaseData"),
+      4)
+  }
+
+
   test("SPARK-2041 column name equals tablename") {
     checkAnswer(
       sql("SELECT tableName FROM tableName"),
@@ -53,14 +72,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq
     )
   }
-  
+
   test("SQRT with automatic string casts") {
     checkAnswer(
       sql("SELECT SQRT(CAST(key AS STRING)) FROM testData"),
       (1 to 100).map(x => Row(math.sqrt(x.toDouble))).toSeq
     )
   }
-  
+
   test("SPARK-2407 Added Parser of SQL SUBSTR()") {
     checkAnswer(
       sql("SELECT substr(tableName, 1, 2) FROM tableName"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org