You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/20 14:29:43 UTC
flink git commit: [FLINK-7357] [table] Fix translation of group
window queries with window props and HAVING.
Repository: flink
Updated Branches:
refs/heads/release-1.3 100951e27 -> cc71dec10
[FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc71dec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc71dec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc71dec1
Branch: refs/heads/release-1.3
Commit: cc71dec108f28562bca5f99c53950a7be6d1ba54
Parents: 100951e
Author: Rong Rong <ro...@uber.com>
Authored: Thu Aug 10 10:46:25 2017 -0700
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 20 11:00:33 2017 +0200
----------------------------------------------------------------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 4 +-
.../common/WindowStartEndPropertiesRule.scala | 169 ++++++++++++-------
.../scala/batch/sql/WindowAggregateTest.scala | 42 +++++
.../table/api/scala/stream/sql/SqlITCase.scala | 51 ++++++
.../scala/stream/sql/WindowAggregateTest.scala | 39 +++++
5 files changed, 243 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index bb3833a..e11f3e0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -137,7 +137,8 @@ object FlinkRuleSets {
// Transform window to LogicalWindowAggregate
DataSetLogicalWindowAggregateRule.INSTANCE,
- WindowStartEndPropertiesRule.INSTANCE
+ WindowStartEndPropertiesRule.INSTANCE,
+ WindowStartEndPropertiesHavingRule.INSTANCE
)
/**
@@ -169,6 +170,7 @@ object FlinkRuleSets {
// Transform window to LogicalWindowAggregate
DataStreamLogicalWindowAggregateRule.INSTANCE,
WindowStartEndPropertiesRule.INSTANCE,
+ WindowStartEndPropertiesHavingRule.INSTANCE,
// simplify expressions rules
ReduceExpressionsRule.FILTER_INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 14e9b21..33190e6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -18,20 +18,19 @@
package org.apache.flink.table.plan.rules.common
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
import org.apache.calcite.rex.{RexCall, RexNode}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
import scala.collection.JavaConversions._
-class WindowStartEndPropertiesRule
- extends RelOptRule(
- WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
- "WindowStartEndPropertiesRule") {
+abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String)
+ extends RelOptRule(rulePredicate, ruleName) {
override def matches(call: RelOptRuleCall): Boolean = {
val project = call.rel(0).asInstanceOf[LogicalProject]
@@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule
project.getProjects.exists(hasGroupAuxiliaries)
}
- override def onMatch(call: RelOptRuleCall): Unit = {
-
- val project = call.rel(0).asInstanceOf[LogicalProject]
- val innerProject = call.rel(1).asInstanceOf[LogicalProject]
- val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
- // Retrieve window start and end properties
- val transformed = call.builder()
- val rexBuilder = transformed.getRexBuilder
- transformed.push(LogicalWindowAggregate.create(
- agg.getWindow,
- Seq(
- NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
- NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
- ), agg)
- )
-
- // forward window start and end properties
- transformed.project(
- innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
-
- def replaceGroupAuxiliaries(node: RexNode): RexNode = {
- node match {
- case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
- // replace expression by access to window start
- rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
- case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
- // replace expression by access to window end
- rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
- case c: RexCall =>
- // replace expressions in children
- val newOps = c.getOperands.map(replaceGroupAuxiliaries)
- c.clone(c.getType, newOps)
- case x =>
- // preserve expression
- x
- }
+ def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = {
+ val rexBuilder = relBuilder.getRexBuilder
+ node match {
+ case c: RexCall if isWindowStart(c) =>
+ // replace expression by access to window start
+ rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
+ case c: RexCall if isWindowEnd(c) =>
+ // replace expression by access to window end
+ rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
+ case c: RexCall =>
+ // replace expressions in children
+ val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder))
+ c.clone(c.getType, newOps)
+ case x =>
+ // preserve expression
+ x
}
-
- // replace window auxiliary function by access to window properties
- transformed.project(
- project.getProjects.map(replaceGroupAuxiliaries)
- )
- val res = transformed.build()
- call.transformTo(res)
}
-}
-
-object WindowStartEndPropertiesRule {
- private val WINDOW_EXPRESSION_RULE_PREDICATE =
- RelOptRule.operand(classOf[LogicalProject],
- RelOptRule.operand(classOf[LogicalProject],
- RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))
-
- val INSTANCE = new WindowStartEndPropertiesRule
/** Checks if a RexNode is a window start auxiliary function. */
private def isWindowStart(node: RexNode): Boolean = {
@@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule {
case SqlStdOperatorTable.TUMBLE_START |
SqlStdOperatorTable.HOP_START |
SqlStdOperatorTable.SESSION_START
- => true
+ => true
case _ => false
}
case _ => false
@@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule {
case SqlStdOperatorTable.TUMBLE_END |
SqlStdOperatorTable.HOP_END |
SqlStdOperatorTable.SESSION_END
- => true
+ => true
case _ => false
}
case _ => false
}
}
}
+
+object WindowStartEndPropertiesRule {
+
+ val INSTANCE = new WindowStartEndPropertiesBaseRule(
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))),
+ "WindowStartEndPropertiesRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+ val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+ // Retrieve window start and end properties
+ val builder = call.builder()
+ builder.push(LogicalWindowAggregate.create(
+ agg.getWindow,
+ Seq(
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+ agg)
+ )
+
+ // forward window start and end properties
+ builder.project(
+ innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+ // replace window auxiliary function by access to window properties
+ builder.project(
+ project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+ val res = builder.build()
+ call.transformTo(res)
+ }
+ }
+}
+
+object WindowStartEndPropertiesHavingRule {
+
+ val INSTANCE = new WindowStartEndPropertiesBaseRule(
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalFilter],
+ RelOptRule.operand(classOf[LogicalProject],
+ RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))),
+ "WindowStartEndPropertiesHavingRule") {
+
+ override def onMatch(call: RelOptRuleCall): Unit = {
+
+ val project = call.rel(0).asInstanceOf[LogicalProject]
+ val filter = call.rel(1).asInstanceOf[LogicalFilter]
+ val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+ val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+ // Retrieve window start and end properties
+ val builder = call.builder()
+ builder.push(LogicalWindowAggregate.create(
+ agg.getWindow,
+ Seq(
+ NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
+ NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))),
+ agg)
+ )
+
+ // forward window start and end properties
+ builder.project(
+ innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end")))
+
+ // replace window auxiliary function by access to window properties
+ builder.filter(
+ filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+
+ // replace window auxiliary function by access to window properties
+ builder.project(
+ project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+ )
+
+ val res = builder.build()
+ call.transformTo(res)
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index 328c03c..a0afe5e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -335,4 +335,46 @@ class WindowAggregateTest extends TableTestBase {
"GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
util.verifySql(sql, "n/a")
}
+
+ @Test
+ def testExpressionOnWindowHavingFunction() = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "FROM T " +
+ "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "HAVING " +
+ " SUM(a) > 0 AND " +
+ " QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetWindowAggregate",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "ts, a")
+ ),
+ term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0",
+ "SUM(a) AS $f1",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end")
+ ),
+ term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+ term("where",
+ "AND(>($f1, 0), " +
+ "=(+(/INT(-(EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret(CAST(w$start)), 86400000)), " +
+ "1), 3), 1), 1))")
+ )
+
+ util.verifySql(sql, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 55633ff..834c243 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -19,6 +19,7 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
@@ -26,6 +27,7 @@ import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestDa
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.scala.stream.sql.OverWindowITCase.EventTimeSourceFunction
import org.apache.flink.types.Row
import org.junit.Assert._
import org.junit._
@@ -315,5 +317,54 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testHopStartEndWithHaving(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQueryHopStartEndWithHaving =
+ """
+ |SELECT
+ | c AS k,
+ | COUNT(a) AS v,
+ | HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart,
+ | HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd
+ |FROM T1
+ |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c
+ |HAVING
+ | SUM(b) > 1 AND
+ | QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)) = 1
+ """.stripMargin
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Right(14000010L),
+ Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate having filter
+ Left(8640000001L, (4, 1L, "Hello")),
+ Right(8640000010L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+ resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
+
+ env.execute()
+
+ val expected = List(
+ "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cc71dec1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f95d0ab..f7735ef 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -224,4 +224,43 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, "n/a")
}
+
+ @Test
+ def testExpressionOnWindowHavingFunction() = {
+ val sql =
+ "SELECT " +
+ " COUNT(*), " +
+ " HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "FROM MyTable " +
+ "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+ "HAVING " +
+ " SUM(a) > 0 AND " +
+ " QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1"
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "rowtime, a")
+ ),
+ term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0",
+ "SUM(a) AS $f1",
+ "start('w$) AS w$start",
+ "end('w$) AS w$end")
+ ),
+ term("select", "EXPR$0", "w$start"),
+ term("where",
+ "AND(>($f1, 0), " +
+ "=(+(/INT(-(EXTRACT_DATE(FLAG(MONTH), /INT(Reinterpret(w$start), 86400000)), 1), " +
+ "3), 1), 1))")
+ )
+
+ streamUtil.verifySql(sql, expected)
+ }
}