You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 10:26:04 UTC

flink git commit: [FLINK-5357] [table] Fix dropped projections

Repository: flink
Updated Branches:
  refs/heads/master 536e4b352 -> 614acc3e7


[FLINK-5357] [table] Fix dropped projections

This closes #3063.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614acc3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614acc3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614acc3e

Branch: refs/heads/master
Commit: 614acc3e7b86ff40364fcfd62b3064c93755379a
Parents: 536e4b3
Author: twalthr <tw...@apache.org>
Authored: Thu Jan 5 14:14:52 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 11:22:04 2017 +0100

----------------------------------------------------------------------
 .../flink/table/plan/logical/operators.scala    | 17 ++++---------
 .../scala/batch/table/FieldProjectionTest.scala | 25 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index eae42cd..743bdfe 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -92,20 +92,11 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
   }
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
-    val allAlias = projectList.forall(_.isInstanceOf[Alias])
     child.construct(relBuilder)
-    if (allAlias) {
-      // Calcite's RelBuilder does not translate identity projects even if they rename fields.
-      //   Add a projection ourselves (will be automatically removed by translation rules).
-      val project = LogicalProject.create(relBuilder.peek(),
-        // avoid AS call
-        projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
-        projectList.map(_.name).asJava)
-      relBuilder.build()  // pop previous relNode
-      relBuilder.push(project)
-    } else {
-      relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
-    }
+    relBuilder.project(
+      projectList.map(_.toRexNode(relBuilder)).asJava,
+      projectList.map(_.name).asJava,
+      true)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index a80e0cb..cc691d2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -280,6 +280,30 @@ class FieldProjectionTest extends TableTestBase {
     streamUtil.verifyTable(resultTable, expected)
   }
 
+  @Test
+  def testSelectFromAggregatedPojoTable(): Unit = {
+    val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
+    val resultTable = sourceTable
+      .groupBy('word)
+      .select('word, 'frequency.sum as 'frequency)
+      .filter('frequency === 2)
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetAggregate",
+          batchTableNode(0),
+          term("groupBy", "word"),
+          term("select", "word", "SUM(frequency) AS TMP_0")
+        ),
+        term("select", "word, frequency"),
+        term("where", "=(frequency, 2)")
+      )
+
+    util.verifyTable(resultTable, expected)
+  }
+
   @Test(expected = classOf[ValidationException])
   def testSelectFromBatchWindow1(): Unit = {
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
@@ -315,4 +339,5 @@ object FieldProjectionTest {
     def eval(s: String): Int = s.hashCode()
   }
 
+  case class WC(word: String, frequency: Long)
 }