You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/20 11:50:09 UTC
flink git commit: [FLINK-8095] [table] Introduce
ProjectSetOpTransposeRule
Repository: flink
Updated Branches:
refs/heads/master 691c48a14 -> 0a22acef4
[FLINK-8095] [table] Introduce ProjectSetOpTransposeRule
This closes #5026.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22acef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22acef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22acef
Branch: refs/heads/master
Commit: 0a22acef41ede452b3df1a9874b5ae4a336d8a77
Parents: 691c48a
Author: Xpray <le...@gmail.com>
Authored: Fri Nov 17 11:01:27 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Nov 20 12:49:13 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 2 +
.../api/batch/table/SetOperatorsTest.scala | 58 ++++++++++++++++++++
.../api/stream/table/SetOperatorsTest.scala | 29 ++++++++++
.../plan/TimeIndicatorConversionTest.scala | 16 ++++--
4 files changed, 99 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a20d14f..10d6881 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -54,6 +54,8 @@ object FlinkRuleSets {
FilterAggregateTransposeRule.INSTANCE,
// push filter through set operation
FilterSetOpTransposeRule.INSTANCE,
+ // push project through set operation
+ ProjectSetOpTransposeRule.INSTANCE,
// aggregation and projection rules
AggregateProjectMergeRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 35f4429..929ce9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -215,4 +215,62 @@ class SetOperatorsTest extends TableTestBase {
util.verifyTable(result, expected)
}
+
+ @Test
+ def testProjectUnionTranspose(): Unit = {
+ val util = batchTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.select('a, 'b, 'c)
+ .unionAll(right.select('a, 'b, 'c))
+ .select('b, 'c)
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b", "c")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "b", "c")
+ ),
+ term("union", "b", "c")
+ )
+
+ util.verifyTable(result, expected)
+
+ }
+
+ @Test
+ def testProjectMinusTranspose(): Unit = {
+ val util = batchTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.select('a, 'b, 'c)
+ .minusAll(right.select('a, 'b, 'c))
+ .select('b, 'c)
+
+ val expected = binaryNode(
+ "DataSetMinus",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b", "c")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "b", "c")
+ ),
+ term("minus", "b", "c")
+ )
+
+ util.verifyTable(result, expected)
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index b1b700b..c0fc05b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -65,4 +65,33 @@ class SetOperatorsTest extends TableTestBase {
util.verifyTable(result, expected)
}
+
+ @Test
+ def testProjectUnionTranspose(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.select('a, 'b, 'c)
+ .unionAll(right.select('a, 'b, 'c))
+ .select('b, 'c)
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "b", "c")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "b", "c")
+ ),
+ term("union all", "b", "c")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 009ae40..faca7f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -209,15 +209,19 @@ class TimeIndicatorConversionTest extends TableTestBase {
val result = t.unionAll(t).select('rowtime)
- val expected = unaryNode(
- "DataStreamCalc",
- binaryNode(
- "DataStreamUnion",
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
streamTableNode(0),
+ term("select", "rowtime")
+ ),
+ unaryNode(
+ "DataStreamCalc",
streamTableNode(0),
- term("union all", "rowtime", "long", "int")
+ term("select", "rowtime")
),
- term("select", "rowtime")
+ term("union all", "rowtime")
)
util.verifyTable(result, expected)