You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/01/11 09:31:45 UTC
flink git commit: [FLINK-6094] [table] Use the lexicographic smallest
attribute as the common group id
Repository: flink
Updated Branches:
refs/heads/master 00ad0eb12 -> e44700061
[FLINK-6094] [table] Use the lexicographic smallest attribute as the common group id
This closes #5273.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4470006
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4470006
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4470006
Branch: refs/heads/master
Commit: e44700061fb9bf1813f86ef223ecf285b1f9d83a
Parents: 00ad0eb
Author: 军长 <he...@alibaba-inc.com>
Authored: Wed Jan 10 15:48:19 2018 +0800
Committer: twalthr <tw...@apache.org>
Committed: Thu Jan 11 10:31:12 2018 +0100
----------------------------------------------------------------------
.../table/plan/util/UpdatingPlanChecker.scala | 9 +++++++--
.../table/plan/UpdatingPlanCheckerTest.scala | 21 +++++++++++++++++---
2 files changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4470006/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 56465cc..c1ebac8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -39,8 +39,13 @@ object UpdatingPlanChecker {
/** Extracts the unique keys of the table produced by the plan. */
def getUniqueKeyFields(plan: RelNode): Option[Array[String]] = {
+ getUniqueKeyGroups(plan).map(_.map(_._1).toArray)
+ }
+
+ /** Extracts the unique keys and groups of the table produced by the plan. */
+ def getUniqueKeyGroups(plan: RelNode): Option[Seq[(String, String)]] = {
val keyExtractor = new UniqueKeyExtractor
- keyExtractor.visit(plan).map(_.map(_._1).toArray)
+ keyExtractor.visit(plan)
}
private class AppendOnlyValidator extends RelVisitor {
@@ -101,7 +106,7 @@ object UpdatingPlanChecker {
.filter(io => inputKeys.get.map(e => e._1).contains(io._1))
val inputKeysMap = inputKeys.get.toMap
- val inOutGroups = inputKeysAndOutput
+ val inOutGroups = inputKeysAndOutput.sorted.reverse
.map(e => (inputKeysMap(e._1), e._2))
.toMap
http://git-wip-us.apache.org/repos/asf/flink/blob/e4470006/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
index a648724..aef6443 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
@@ -83,6 +83,9 @@ class UpdatingPlanCheckerTest {
.select('a as 'a1, 'a as 'a2, 'b.count)
util.verifyTableUniqueKey(resultTable, Seq("a1", "a2"))
+ // both a1 and a2 belong to the same group, i.e., a1. We use the lexicographic smallest
+ // attribute as the common group id
+ util.verifyTableKeyGroups(resultTable, Seq(("a1", "a1"), ("a2", "a1")))
}
@Test
@@ -210,13 +213,25 @@ class UpdatePlanCheckerUtil extends StreamTableTestUtil {
verifyTableUniqueKey(tableEnv.sql(query), expected)
}
- def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
+ def getKeyGroups(resultTable: Table): Option[Seq[(String, String)]] = {
val relNode = resultTable.getRelNode
val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false)
- val actual = UpdatingPlanChecker.getUniqueKeyFields(optimized)
+ UpdatingPlanChecker.getUniqueKeyGroups(optimized)
+ }
+ def verifyTableKeyGroups(resultTable: Table, expected: Seq[(String, String)]): Unit = {
+ val actual = getKeyGroups(resultTable)
+ if (actual.isDefined) {
+ assertEquals(expected.sorted, actual.get.sorted)
+ } else {
+ assertEquals(expected.sorted, Nil)
+ }
+ }
+
+ def verifyTableUniqueKey(resultTable: Table, expected: Seq[String]): Unit = {
+ val actual = getKeyGroups(resultTable).map(_.map(_._1))
if (actual.isDefined) {
- assertEquals(expected.sorted, actual.get.toSeq.sorted)
+ assertEquals(expected.sorted, actual.get.sorted)
} else {
assertEquals(expected.sorted, Nil)
}