You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/16 15:15:33 UTC

flink git commit: [FLINK-7986] [table] Introduce FilterSetOpTransposeRule

Repository: flink
Updated Branches:
  refs/heads/master cd1fbc078 -> 81dc260dc


[FLINK-7986] [table] Introduce FilterSetOpTransposeRule

This closes #4956.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d

Branch: refs/heads/master
Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e
Parents: cd1fbc0
Author: Xpray <le...@gmail.com>
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 16 14:43:50 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../api/batch/table/SetOperatorsTest.scala      | 80 ++++++++++++++++++++
 .../api/stream/table/SetOperatorsTest.scala     | 68 +++++++++++++++++
 3 files changed, 150 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
     FilterJoinRule.JOIN,
     // push filter through an aggregation
     FilterAggregateTransposeRule.INSTANCE,
+    // push filter through set operation
+    FilterSetOpTransposeRule.INSTANCE,
 
     // aggregation and projection rules
     AggregateProjectMergeRule.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
 
     util.verifyJavaTable(in, expected)
   }
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+    val util = batchTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.unionAll(right)
+      .where('a > 0)
+      .groupBy('b)
+      .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        binaryNode(
+          "DataSetUnion",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "a", "b", "c"),
+            term("where", ">(a, 0)")
+          ),
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(1),
+            term("select", "a", "b", "c"),
+            term("where", ">(a, 0)")
+          ),
+          term("union", "a", "b", "c")
+        ),
+        term("groupBy", "b"),
+        term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+      ),
+      term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilterMinusTranspose(): Unit = {
+    val util = batchTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.minusAll(right)
+      .where('a > 0)
+      .groupBy('b)
+      .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      unaryNode(
+        "DataSetAggregate",
+        binaryNode(
+          "DataSetMinus",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "a", "b", "c"),
+            term("where", ">(a, 0)")
+          ),
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(1),
+            term("select", "a", "b", "c"),
+            term("where", ">(a, 0)")
+          ),
+          term("minus", "a", "b", "c")
+        ),
+        term("groupBy", "b"),
+        term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+      ),
+      term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+    )
+
+    util.verifyTable(result, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
new file mode 100644
index 0000000..b1b700b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+  @Test
+  def testFilterUnionTranspose(): Unit = {
+    val util = streamTestUtil()
+    val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+    val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+    val result = left.unionAll(right)
+      .where('a > 0)
+      .groupBy('b)
+      .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupAggregate",
+          binaryNode(
+            "DataStreamUnion",
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(0),
+              term("select", "a", "b", "c"),
+              term("where", ">(a, 0)")
+            ),
+            unaryNode(
+              "DataStreamCalc",
+              streamTableNode(1),
+              term("select", "a", "b", "c"),
+              term("where", ">(a, 0)")
+            ),
+            term("union all", "a", "b", "c")
+          ),
+          term("groupBy", "b"),
+          term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+        ),
+      term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+    )
+
+    util.verifyTable(result, expected)
+  }
+}