You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/20 11:50:09 UTC

flink git commit: [FLINK-8095] [table] Introduce ProjectSetOpTransposeRule

Repository: flink
Updated Branches:
  refs/heads/master 691c48a14 -> 0a22acef4


[FLINK-8095] [table] Introduce ProjectSetOpTransposeRule

This closes #5026.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a22acef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a22acef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a22acef

Branch: refs/heads/master
Commit: 0a22acef41ede452b3df1a9874b5ae4a336d8a77
Parents: 691c48a
Author: Xpray <le...@gmail.com>
Authored: Fri Nov 17 11:01:27 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Mon Nov 20 12:49:13 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala      | 58 ++++++++++++++++++++
 .../api/stream/table/SetOperatorsTest.scala     | 29 ++++++++++
 .../plan/TimeIndicatorConversionTest.scala      | 16 ++++--
 4 files changed, 99 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a20d14f..10d6881 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -54,6 +54,8 @@ object FlinkRuleSets {
     FilterAggregateTransposeRule.INSTANCE,
     // push filter through set operation
     FilterSetOpTransposeRule.INSTANCE,
+    // push project through set operation
+    ProjectSetOpTransposeRule.INSTANCE,
 
     // aggregation and projection rules
     AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 35f4429..929ce9c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -215,4 +215,62 @@ class SetOperatorsTest extends TableTestBase {
 
     util.verifyTable(result, expected)
   }
+
+  @Test
+  def testProjectUnionTranspose(): Unit = {
+    val util = batchTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.select('a, 'b, 'c)
+                 .unionAll(right.select('a, 'b, 'c))
+                 .select('b, 'c)
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "b", "c")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(1),
+        term("select", "b", "c")
+      ),
+      term("union", "b", "c")
+    )
+
+    util.verifyTable(result, expected)
+
+  }
+
+  @Test
+  def testProjectMinusTranspose(): Unit = {
+    val util = batchTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.select('a, 'b, 'c)
+                 .minusAll(right.select('a, 'b, 'c))
+                 .select('b, 'c)
+
+    val expected = binaryNode(
+      "DataSetMinus",
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(0),
+        term("select", "b", "c")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        batchTableNode(1),
+        term("select", "b", "c")
+      ),
+      term("minus", "b", "c")
+    )
+
+    util.verifyTable(result, expected)
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
index b1b700b..c0fc05b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -65,4 +65,33 @@ class SetOperatorsTest extends TableTestBase {
 
     util.verifyTable(result, expected)
   }
+
+  @Test
+  def testProjectUnionTranspose(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.select('a, 'b, 'c)
+      .unionAll(right.select('a, 'b, 'c))
+      .select('b, 'c)
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(0),
+        term("select", "b", "c")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        streamTableNode(1),
+        term("select", "b", "c")
+      ),
+      term("union all", "b", "c")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a22acef/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 009ae40..faca7f9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -209,15 +209,19 @@ class TimeIndicatorConversionTest extends TableTestBase {
 
     val result = t.unionAll(t).select('rowtime)
 
-    val expected = unaryNode(
-      "DataStreamCalc",
-      binaryNode(
-        "DataStreamUnion",
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
         streamTableNode(0),
+        term("select", "rowtime")
+      ),
+      unaryNode(
+        "DataStreamCalc",
         streamTableNode(0),
-        term("union all", "rowtime", "long", "int")
+        term("select", "rowtime")
       ),
-      term("select", "rowtime")
+      term("union all", "rowtime")
     )
 
     util.verifyTable(result, expected)