You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/08 15:55:20 UTC

flink git commit: [FLINK-2000][table] Add SQL-style Aggregation Support

Repository: flink
Updated Branches:
  refs/heads/master 804430bdf -> 7805db813


[FLINK-2000][table] Add SQL-style Aggregation Support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7805db81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7805db81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7805db81

Branch: refs/heads/master
Commit: 7805db813dd744f13776320d556e1cefa0351464
Parents: 804430b
Author: Cheng Hao <ch...@gmail.com>
Authored: Thu Jun 4 23:17:35 2015 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 8 15:37:15 2015 +0200

----------------------------------------------------------------------
 .../api/table/parser/ExpressionParser.scala     | 20 ++++++++++++++-----
 .../table/test/GroupedAggreagationsITCase.scala | 21 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index 1586f50..7bad7fe 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -43,6 +43,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
   // KeyWord
 
   lazy val AS: Keyword = Keyword("as")
+  lazy val COUNT: Keyword = Keyword("count")
+  lazy val AVG: Keyword = Keyword("avg")
+  lazy val MIN: Keyword = Keyword("min")
+  lazy val MAX: Keyword = Keyword("max")
+  lazy val SUM: Keyword = Keyword("sum")
 
   // Literals
 
@@ -91,11 +96,16 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
 
   lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
 
-  lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
-  lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
-  lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
-  lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) }
-  lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
+  lazy val sum: PackratParser[Expression] =
+    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
+  lazy val min: PackratParser[Expression] =
+    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
+  lazy val max: PackratParser[Expression] =
+    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
+  lazy val count: PackratParser[Expression] =
+    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
+  lazy val avg: PackratParser[Expression] =
+    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })
 
   lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
     case e ~ _ ~ as ~ _ => Naming(e, as.name)

http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index d76d75c..5afd6ca 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -93,4 +93,25 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
     env.execute()
     expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
   }
+
+  @Test
+  def testSQLStyleAggregations(): Unit = {
+
+    // the grouping key needs to be forwarded to the intermediate DataSet, even
+    // if we don't want the key in the output
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+      .select(
+        """Sum( a) as a1, a.sum as a2,
+          |Min (a) as b1, a.min as b2,
+          |Max (a ) as c1, a.max as c2,
+          |Avg ( a ) as d1, a.avg as d2,
+          |Count(a) as e1, a.count as e2
+        """.stripMargin)
+
+    ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+    env.execute()
+    expected = "231,231,1,1,21,21,11,11,21,21"
+  }
 }