You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/06/08 15:55:20 UTC
flink git commit: [FLINK-2000][table] Add SQL-style Aggregation
Support
Repository: flink
Updated Branches:
refs/heads/master 804430bdf -> 7805db813
[FLINK-2000][table] Add SQL-style Aggregation Support
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7805db81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7805db81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7805db81
Branch: refs/heads/master
Commit: 7805db813dd744f13776320d556e1cefa0351464
Parents: 804430b
Author: Cheng Hao <ch...@gmail.com>
Authored: Thu Jun 4 23:17:35 2015 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 8 15:37:15 2015 +0200
----------------------------------------------------------------------
.../api/table/parser/ExpressionParser.scala | 20 ++++++++++++++-----
.../table/test/GroupedAggreagationsITCase.scala | 21 ++++++++++++++++++++
2 files changed, 36 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index 1586f50..7bad7fe 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -43,6 +43,11 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// KeyWord
lazy val AS: Keyword = Keyword("as")
+ lazy val COUNT: Keyword = Keyword("count")
+ lazy val AVG: Keyword = Keyword("avg")
+ lazy val MIN: Keyword = Keyword("min")
+ lazy val MAX: Keyword = Keyword("max")
+ lazy val SUM: Keyword = Keyword("sum")
// Literals
@@ -91,11 +96,16 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
- lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) }
- lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) }
- lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) }
- lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) }
- lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) }
+ lazy val sum: PackratParser[Expression] =
+ (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => Sum(e) })
+ lazy val min: PackratParser[Expression] =
+ (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => Min(e) })
+ lazy val max: PackratParser[Expression] =
+ (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => Max(e) })
+ lazy val count: PackratParser[Expression] =
+ (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { e => Count(e) })
+ lazy val avg: PackratParser[Expression] =
+ (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => Avg(e) })
lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ {
case e ~ _ ~ as ~ _ => Naming(e, as.name)
http://git-wip-us.apache.org/repos/asf/flink/blob/7805db81/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
index d76d75c..5afd6ca 100644
--- a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggreagationsITCase.scala
@@ -93,4 +93,25 @@ class GroupedAggreagationsITCase(mode: TestExecutionMode) extends MultipleProgra
env.execute()
expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
}
+
+ @Test
+ def testSQLStyleAggregations(): Unit = {
+
+ // the grouping key needs to be forwarded to the intermediate DataSet, even
+ // if we don't want the key in the output
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+ val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+ .select(
+ """Sum( a) as a1, a.sum as a2,
+ |Min (a) as b1, a.min as b2,
+ |Max (a ) as c1, a.max as c2,
+ |Avg ( a ) as d1, a.avg as d2,
+ |Count(a) as e1, a.count as e2
+ """.stripMargin)
+
+ ds.writeAsText(resultPath, WriteMode.OVERWRITE)
+ env.execute()
+ expected = "231,231,1,1,21,21,11,11,21,21"
+ }
}