You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/11 09:31:45 UTC

flink git commit: [FLINK-6094] [table] Use the lexicographic smallest attribute as the common group id

Repository: flink
Updated Branches:
  refs/heads/master 00ad0eb12 -> e44700061


[FLINK-6094] [table] Use the lexicographic smallest attribute as the common group id

This closes #5273.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4470006
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4470006
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4470006

Branch: refs/heads/master
Commit: e44700061fb9bf1813f86ef223ecf285b1f9d83a
Parents: 00ad0eb
Author: 军长 <he...@alibaba-inc.com>
Authored: Wed Jan 10 15:48:19 2018 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Jan 11 10:31:12 2018 +0100

----------------------------------------------------------------------
 .../table/plan/util/UpdatingPlanChecker.scala   |  9 +++++++--
 .../table/plan/UpdatingPlanCheckerTest.scala    | 21 +++++++++++++++++---
 2 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4470006/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 56465cc..c1ebac8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -39,8 +39,13 @@ object UpdatingPlanChecker {
 
   /** Extracts the unique keys of the table produced by the plan. */
   def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+    getUniqueKeyGroups(plan).map(_.map(_._1).toArray)
+  }
+
+  /** Extracts the unique keys and groups of the table produced by the plan. */
+  def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = {
     val keyExtractor = new UniqueKeyExtractor
-    keyExtractor.visit(plan).map(_.map(_._1).toArray)
+    keyExtractor.visit(plan)
   }
 
   private class AppendOnlyValidator extends RelVisitor {
@@ -101,7 +106,7 @@ object UpdatingPlanChecker {
               .filter(io => inputKeys.get.map(e => e._1).contains(io._1))
 
             val inputKeysMap = inputKeys.get.toMap
-            val inOutGroups = inputKeysAndOutput
+            val inOutGroups = inputKeysAndOutput.sorted.reverse
               .map(e => (inputKeysMap(e._1), e._2))
               .toMap
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e4470006/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
index a648724..aef6443 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
@@ -83,6 +83,9 @@ class UpdatingPlanCheckerTest {
       .select('a as 'a1, 'a as 'a2, 'b.count)
 
     util.verifyTableUniqueKey(resultTable, Seq("a1", "a2"))
+    // both a1 and a2 belong to the same group, i.e., a1. We use the lexicographic smallest
+    // attribute as the common group id
+    util.verifyTableKeyGroups(resultTable, Seq(("a1", "a1"), ("a2", "a1")))
   }
 
   @Test
@@ -210,13 +213,25 @@ class UpdatePlanCheckerUtil extends StreamTableTestUtil {
     verifyTableUniqueKey(tableEnv.sql(query), expected)
   }
 
-  def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
+  def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = {
     val relNode = resultTable.getRelNode
     val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
-    val actual = UpdatingPlanChecker.getUniqueKeyFields(optimized)
+    UpdatingPlanChecker.getUniqueKeyGroups(optimized)
+  }
 
+  def verifyTableKeyGroups(resultTable: Table, expected: Seq[(String, String)]): Unit = {
+    val actual = getKeyGroups(resultTable)
+    if (actual.isDefined) {
+      assertEquals(expected.sorted, actual.get.sorted)
+    } else {
+      assertEquals(expected.sorted, Nil)
+    }
+  }
+
+  def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
+    val actual = getKeyGroups(resultTable).map(_.map(_._1))
     if (actual.isDefined) {
-      assertEquals(expected.sorted, actual.get.toSeq.sorted)
+      assertEquals(expected.sorted, actual.get.sorted)
     } else {
       assertEquals(expected.sorted, Nil)
     }