You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/15 17:28:46 UTC
flink git commit: [FLINK-7942] [table] Reduce aliasing in RexNodes
Repository: flink
Updated Branches:
refs/heads/master 59df4b75f -> b6a2dc345
[FLINK-7942] [table] Reduce aliasing in RexNodes
This closes #5019.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b6a2dc34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b6a2dc34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b6a2dc34
Branch: refs/heads/master
Commit: b6a2dc345f37c4b643789e98d02e23a022d31415
Parents: 59df4b7
Author: twalthr <tw...@apache.org>
Authored: Wed Nov 15 12:07:16 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Nov 15 18:26:50 2017 +0100
----------------------------------------------------------------------
.../org/apache/flink/table/api/table.scala | 12 ++++-
.../flink/table/plan/logical/operators.scala | 26 +++++++++--
.../flink/table/api/batch/table/JoinTest.scala | 47 ++++++++++++++++++++
.../flink/table/plan/RetractionRulesTest.scala | 40 ++++++-----------
4 files changed, 93 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 7349a0e..071cc69 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -1106,7 +1106,13 @@ class OverWindowedTable(
new Table(
table.tableEnv,
- Project(expandedOverFields.map(UnresolvedAlias), table.logicalPlan).validate(table.tableEnv))
+ Project(
+ expandedOverFields.map(UnresolvedAlias),
+ table.logicalPlan,
+ // required for proper projection push down
+ explicitAlias = true)
+ .validate(table.tableEnv)
+ )
}
def select(fields: String): Table = {
@@ -1150,7 +1156,9 @@ class WindowGroupedTable(
propNames.map(a => Alias(a._1, a._2)).toSeq,
aggNames.map(a => Alias(a._1, a._2)).toSeq,
Project(projectFields, table.logicalPlan).validate(table.tableEnv)
- ).validate(table.tableEnv)
+ ).validate(table.tableEnv),
+ // required for proper resolution of the time attribute in multi-windows
+ explicitAlias = true
).validate(table.tableEnv))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index fe2bfe5..a2bd1e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -42,7 +42,12 @@ import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
import scala.collection.JavaConverters._
import scala.collection.mutable
-case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode {
+case class Project(
+ projectList: Seq[NamedExpression],
+ child: LogicalNode,
+ explicitAlias: Boolean = false)
+ extends UnaryNode {
+
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
@@ -61,7 +66,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
throw new RuntimeException("This should never be called and probably points to a bug.")
}
}
- Project(newProjectList, child)
+ Project(newProjectList, child, explicitAlias)
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
@@ -90,8 +95,19 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
child.construct(relBuilder)
+
+ val exprs = if (explicitAlias) {
+ projectList
+ } else {
+ // remove AS expressions, according to Calcite they should not be in a final RexNode
+ projectList.map {
+ case Alias(e: Expression, _, _) => e
+ case e: Expression => e
+ }
+ }
+
relBuilder.project(
- projectList.map(_.toRexNode(relBuilder)).asJava,
+ exprs.map(_.toRexNode(relBuilder)).asJava,
projectList.map(_.name).asJava,
true)
}
@@ -116,7 +132,9 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una
val input = child.output
Project(
names.zip(input).map { case (name, attr) =>
- Alias(attr, name)} ++ input.drop(names.length), child)
+ Alias(attr, name)} ++ input.drop(names.length),
+ child,
+ explicitAlias = true)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
index 9ee7fc2..ce62252 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/JoinTest.scala
@@ -19,7 +19,9 @@
package org.apache.flink.table.api.batch.table
import org.apache.flink.api.scala._
+import org.apache.flink.table.api.batch.table.JoinTest.Merger
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
@@ -301,4 +303,49 @@ class JoinTest extends TableTestBase {
util.verifyTable(joined, expected)
}
+
+ @Test
+ def testFilterJoinRule(): Unit = {
+ val util = batchTestUtil()
+ val t1 = util.addTable[(String, Int, Int)]('a, 'b, 'c)
+ val t2 = util.addTable[(String, Int, Int)]('d, 'e, 'f)
+ val results = t1
+ .leftOuterJoin(t2, 'b === 'e)
+ .select('c, Merger('c, 'f) as 'c0)
+ .select(Merger('c, 'c0) as 'c1)
+ .where('c1 >= 0)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ binaryNode(
+ "DataSetJoin",
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(0),
+ term("select", "b", "c")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ batchTableNode(1),
+ term("select", "e", "f")
+ ),
+ term("where", "=(b, e)"),
+ term("join", "b", "c", "e", "f"),
+ term("joinType", "LeftOuterJoin")
+ ),
+ term("select", "Merger$(c, Merger$(c, f)) AS c1"),
+ term("where", ">=(Merger$(c, Merger$(c, f)), 0)")
+ )
+
+ util.verifyTable(results, expected)
+ }
+}
+
+object JoinTest {
+
+ object Merger extends ScalarFunction {
+ def eval(f0: Int, f1: Int): Int = {
+ f0 + f1
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b6a2dc34/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
index 999a808..ba3c314 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RetractionRulesTest.scala
@@ -86,22 +86,18 @@ class RetractionRulesTest extends TableTestBase {
val expected =
unaryNode(
- "DataStreamCalc",
+ "DataStreamGroupAggregate",
unaryNode(
- "DataStreamGroupAggregate",
+ "DataStreamCalc",
unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamGroupAggregate",
- "DataStreamScan(true, Acc)",
- "true, AccRetract"
- ),
+ "DataStreamGroupAggregate",
+ "DataStreamScan(true, Acc)",
"true, AccRetract"
),
- s"$defaultStatus"
+ "true, AccRetract"
),
s"$defaultStatus"
- )
+ )
util.verifyTableTrait(resultTable, expected)
}
@@ -253,28 +249,20 @@ class RetractionRulesTest extends TableTestBase {
val expected =
unaryNode(
- "DataStreamCalc",
+ "DataStreamGroupAggregate",
unaryNode(
- "DataStreamGroupAggregate",
- unaryNode(
- "DataStreamCalc",
- binaryNode(
- "DataStreamUnion",
- unaryNode(
- "DataStreamCalc",
- unaryNode(
- "DataStreamGroupAggregate",
- "DataStreamScan(true, Acc)",
- "true, AccRetract"
- ),
- "true, AccRetract"
- ),
+ "DataStreamCalc",
+ binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamGroupAggregate",
"DataStreamScan(true, Acc)",
"true, AccRetract"
),
+ "DataStreamScan(true, Acc)",
"true, AccRetract"
),
- s"$defaultStatus"
+ "true, AccRetract"
),
s"$defaultStatus"
)