You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/16 15:15:33 UTC
flink git commit: [FLINK-7986] [table] Introduce
FilterSetOpTransposeRule
Repository: flink
Updated Branches:
refs/heads/master cd1fbc078 -> 81dc260dc
[FLINK-7986] [table] Introduce FilterSetOpTransposeRule
This closes #4956.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d
Branch: refs/heads/master
Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e
Parents: cd1fbc0
Author: Xpray <le...@gmail.com>
Authored: Mon Nov 6 23:47:33 2017 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Nov 16 14:43:50 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/rules/FlinkRuleSets.scala | 2 +
.../api/batch/table/SetOperatorsTest.scala | 80 ++++++++++++++++++++
.../api/stream/table/SetOperatorsTest.scala | 68 +++++++++++++++++
3 files changed, 150 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index dcc735d..a20d14f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -52,6 +52,8 @@ object FlinkRuleSets {
FilterJoinRule.JOIN,
// push filter through an aggregation
FilterAggregateTransposeRule.INSTANCE,
+ // push filter through set operation
+ FilterSetOpTransposeRule.INSTANCE,
// aggregation and projection rules
AggregateProjectMergeRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
index 2d4e205..35f4429 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala
@@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase {
util.verifyJavaTable(in, expected)
}
+
+ @Test
+ def testFilterUnionTranspose(): Unit = {
+ val util = batchTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.unionAll(right)
+ .where('a > 0)
+ .groupBy('b)
+ .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ term("union", "a", "b", "c")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+ ),
+ term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFilterMinusTranspose(): Unit = {
+ val util = batchTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.minusAll(right)
+ .where('a > 0)
+ .groupBy('b)
+ .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ binaryNode(
+ "DataSetMinus",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ term("minus", "a", "b", "c")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+ ),
+ term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+ )
+
+ util.verifyTable(result, expected)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
new file mode 100644
index 0000000..b1b700b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
+import org.junit.Test
+
+class SetOperatorsTest extends TableTestBase {
+
+ @Test
+ def testFilterUnionTranspose(): Unit = {
+ val util = streamTestUtil()
+ val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c)
+ val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c)
+
+ val result = left.unionAll(right)
+ .where('a > 0)
+ .groupBy('b)
+ .select('a.sum as 'a, 'b as 'b, 'c.count as 'c)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupAggregate",
+ binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(1),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 0)")
+ ),
+ term("union all", "a", "b", "c")
+ ),
+ term("groupBy", "b"),
+ term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1")
+ ),
+ term("select", "TMP_0 AS a", "b", "TMP_1 AS c")
+ )
+
+ util.verifyTable(result, expected)
+ }
+}