You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 10:26:04 UTC
flink git commit: [FLINK-5357] [table] Fix dropped projections
Repository: flink
Updated Branches:
refs/heads/master 536e4b352 -> 614acc3e7
[FLINK-5357] [table] Fix dropped projections
This closes #3063.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614acc3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614acc3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614acc3e
Branch: refs/heads/master
Commit: 614acc3e7b86ff40364fcfd62b3064c93755379a
Parents: 536e4b3
Author: twalthr <tw...@apache.org>
Authored: Thu Jan 5 14:14:52 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 11:22:04 2017 +0100
----------------------------------------------------------------------
.../flink/table/plan/logical/operators.scala | 17 ++++---------
.../scala/batch/table/FieldProjectionTest.scala | 25 ++++++++++++++++++++
2 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index eae42cd..743bdfe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -92,20 +92,11 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
}
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- val allAlias = projectList.forall(_.isInstanceOf[Alias])
child.construct(relBuilder)
- if (allAlias) {
- // Calcite's RelBuilder does not translate identity projects even if they rename fields.
- // Add a projection ourselves (will be automatically removed by translation rules).
- val project = LogicalProject.create(relBuilder.peek(),
- // avoid AS call
- projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
- projectList.map(_.name).asJava)
- relBuilder.build() // pop previous relNode
- relBuilder.push(project)
- } else {
- relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
- }
+ relBuilder.project(
+ projectList.map(_.toRexNode(relBuilder)).asJava,
+ projectList.map(_.name).asJava,
+ true)
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index a80e0cb..cc691d2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -280,6 +280,30 @@ class FieldProjectionTest extends TableTestBase {
streamUtil.verifyTable(resultTable, expected)
}
+ @Test
+ def testSelectFromAggregatedPojoTable(): Unit = {
+ val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
+ val resultTable = sourceTable
+ .groupBy('word)
+ .select('word, 'frequency.sum as 'frequency)
+ .filter('frequency === 2)
+
+ val expected =
+ unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetAggregate",
+ batchTableNode(0),
+ term("groupBy", "word"),
+ term("select", "word", "SUM(frequency) AS TMP_0")
+ ),
+ term("select", "word, frequency"),
+ term("where", "=(frequency, 2)")
+ )
+
+ util.verifyTable(resultTable, expected)
+ }
+
@Test(expected = classOf[ValidationException])
def testSelectFromBatchWindow1(): Unit = {
val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
@@ -315,4 +339,5 @@ object FieldProjectionTest {
def eval(s: String): Int = s.hashCode()
}
+ case class WC(word: String, frequency: Long)
}