You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/02 08:09:01 UTC
[flink] branch master updated: [FLINK-12703][table-planner-blink]
Introduce metadata handlers on SEMI/ANTI join and lookup join
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0119afa [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join
0119afa is described below
commit 0119afaf28da2cf39e772b4fb568812ad09cfa79
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 1 17:44:31 2019 +0800
[FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join
This closes #8588
---
.../plan/metadata/FlinkRelMdColumnInterval.scala | 10 ++
.../plan/metadata/FlinkRelMdColumnNullCount.scala | 10 ++
.../metadata/FlinkRelMdColumnOriginNullCount.scala | 4 +-
.../plan/metadata/FlinkRelMdColumnUniqueness.scala | 18 ++-
.../metadata/FlinkRelMdModifiedMonotonicity.scala | 13 +-
.../flink/table/plan/metadata/FlinkRelMdSize.scala | 7 +-
.../plan/metadata/FlinkRelMdUniqueGroups.scala | 8 +-
.../table/plan/metadata/FlinkRelMdUniqueKeys.scala | 19 ++-
.../table/plan/batch/sql/join/LookupJoinTest.scala | 1 -
.../metadata/FlinkRelMdColumnIntervalTest.scala | 23 +++
.../metadata/FlinkRelMdColumnNullCountTest.scala | 14 ++
.../FlinkRelMdColumnOriginNullCountTest.scala | 10 +-
.../metadata/FlinkRelMdColumnUniquenessTest.scala | 37 +++++
.../metadata/FlinkRelMdDistinctRowCountTest.scala | 18 +++
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 174 ++++++++++++++++++++-
.../FlinkRelMdModifiedMonotonicityTest.scala | 3 +
.../FlinkRelMdPercentageOriginalRowsTest.scala | 4 +
.../metadata/FlinkRelMdPopulationSizeTest.scala | 14 ++
.../plan/metadata/FlinkRelMdRowCountTest.scala | 17 ++
.../plan/metadata/FlinkRelMdSelectivityTest.scala | 12 ++
.../table/plan/metadata/FlinkRelMdSizeTest.scala | 5 +
.../plan/metadata/FlinkRelMdUniqueGroupsTest.scala | 45 +++++-
.../plan/metadata/FlinkRelMdUniqueKeysTest.scala | 23 +++
.../table/plan/metadata/MetadataTestUtil.scala | 19 ++-
24 files changed, 476 insertions(+), 32 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
index 20d4610..ed13607 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -101,6 +101,16 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
}
/**
+ * Gets interval of the given column on Snapshot.
+ *
+ * @param snapshot Snapshot RelNode
+ * @param mq RelMetadataQuery instance
+ * @param index the index of the given column
+ * @return interval of the given column on Snapshot.
+ */
+ def getColumnInterval(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): ValueInterval = null
+
+ /**
* Gets interval of the given column on Project.
*
* Note: Only support the simple RexNode, e.g RexInputRef.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
index ec7a59c..673f1cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
@@ -68,6 +68,16 @@ class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount]
}
/**
+ * Gets the null count of the given column on Snapshot.
+ *
+ * @param snapshot Snapshot RelNode
+ * @param mq RelMetadataQuery instance
+ * @param index the index of the given column
+ * @return the null count of the given column on Snapshot.
+ */
+ def getColumnNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): JDouble = null
+
+ /**
* Gets the null count of the given column in Project.
*
* @param project Project RelNode
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
index 6b88b9b..61bb21b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
@@ -58,6 +58,8 @@ class FlinkRelMdColumnOriginNullCount private extends MetadataHandler[ColumnOrig
}
}
+ def getColumnOriginNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): JDouble = null
+
def getColumnOriginNullCount(rel: Project, mq: RelMetadataQuery, index: Int): JDouble = {
getColumnOriginNullOnProjects(rel.getInput, rel.getProjects, mq, index)
}
@@ -119,8 +121,6 @@ class FlinkRelMdColumnOriginNullCount private extends MetadataHandler[ColumnOrig
}
}
- // TODO supports FlinkLogicalSnapshot
-
def getColumnOriginNullCount(rel: RelNode, mq: RelMetadataQuery, index: Int): JDouble = null
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index e6a2686..0afaa62 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
+import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
@@ -498,6 +499,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
)
}
+ def areColumnsUnique(
+ join: CommonLookupJoin,
+ mq: RelMetadataQuery,
+ columns: ImmutableBitSet,
+ ignoreNulls: Boolean): JBoolean = {
+ val left = join.getInput
+ areColumnsUniqueOfJoin(
+ join.joinInfo, join.joinType, left.getRowType,
+ (leftSet: ImmutableBitSet) => mq.areColumnsUnique(left, leftSet, ignoreNulls),
+ // TODO get uniqueKeys from TableSchema of TableSource
+ (_: ImmutableBitSet) => null,
+ mq, columns
+ )
+ }
+
def areColumnsUniqueOfJoin(
joinInfo: JoinInfo,
joinRelType: JoinRelType,
@@ -598,8 +614,6 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = null
- // TODO supports temporal table join
-
def areColumnsUnique(
rel: SetOp,
mq: RelMetadataQuery,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 914768e..3bba1e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -315,7 +315,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
val childMono = inputMonotonicity.fieldMonotonicities(aggCall.getArgList.head)
val currentMono = fieldMonotonicities(index)
if (childMono != currentMono &&
- !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+ !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
// count will Increasing even child is NOT_MONOTONIC
fieldMonotonicities(index) = NOT_MONOTONIC
}
@@ -369,12 +369,16 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
}
def getRelModifiedMonotonicity(rel: Join, mq: RelMetadataQuery): RelModifiedMonotonicity = {
+ val joinType = rel.getJoinType
+ if (joinType.equals(JoinRelType.ANTI)) {
+ return null
+ }
+
val left = rel.getLeft
val right = rel.getRight
val joinInfo = rel.analyzeCondition
val leftKeys = joinInfo.leftKeys
val rightKeys = joinInfo.rightKeys
- val joinType = rel.getJoinType
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
// if group set contains update return null
@@ -389,8 +393,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
val isKeyAllAppend = isAllConstantOnKeys(left, leftKeys.toIntArray) &&
isAllConstantOnKeys(right, rightKeys.toIntArray)
- if (!containDelete && !joinType.equals(JoinRelType.ANTI) && isKeyAllAppend &&
- (containUpdate && joinInfo.isEqui || !containUpdate)) {
+ if (!containDelete && isKeyAllAppend && (containUpdate && joinInfo.isEqui || !containUpdate)) {
// output rowtype of semi equals to the rowtype of left child
if (joinType.equals(JoinRelType.SEMI)) {
@@ -426,7 +429,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
getMonotonicity(rel.getInput(0), mq, rel.getRowType.getFieldCount)
}
- // TODO supports temporal table join
+ // TODO supports temporal table function join
def getRelModifiedMonotonicity(rel: Union, mq: RelMetadataQuery): RelModifiedMonotonicity = {
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
index 5cd6ff3..3a68f7e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
@@ -245,10 +245,9 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] {
def averageColumnSizes(rel: Join, mq: RelMetadataQuery): JList[JDouble] = {
val acsOfLeft = mq.getAverageColumnSizes(rel.getLeft)
- val acsOfRight = if (rel.getJoinType.projectsRight) {
- mq.getAverageColumnSizes(rel.getRight)
- } else {
- null
+ val acsOfRight = rel.getJoinType match {
+ case JoinRelType.SEMI | JoinRelType.ANTI => null
+ case _ => mq.getAverageColumnSizes(rel.getRight)
}
if (acsOfLeft == null && acsOfRight == null) {
null
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
index 73a3adb..62ab472 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -341,11 +341,17 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] {
mq: RelMetadataQuery,
columns: ImmutableBitSet): ImmutableBitSet = {
require(join.getSystemFieldList.isEmpty)
+ val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+ join.getJoinType match {
+ case JoinRelType.SEMI | JoinRelType.ANTI =>
+ return fmq.getUniqueGroups(join.getLeft, columns)
+ case _ => // do nothing
+ }
+
val leftFieldCount = join.getLeft.getRowType.getFieldCount
val (leftColumns, rightColumns) =
FlinkRelMdUtil.splitColumnsIntoLeftAndRight(leftFieldCount, columns)
- val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
val leftUniqueGroups = fmq.getUniqueGroups(join.getLeft, leftColumns)
val rightUniqueGroups = fmq.getUniqueGroups(join.getRight, rightColumns)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
index 956c45d..84d66cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.metadata
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
+import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
@@ -49,7 +50,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
def getDef: MetadataDef[BuiltInMetadata.UniqueKeys] = BuiltInMetadata.UniqueKeys.DEF
-
def getUniqueKeys(
rel: TableScan,
mq: RelMetadataQuery,
@@ -410,6 +410,21 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
getJoinUniqueKeys(joinInfo, rel.joinType, rel.getLeft, rel.getRight, mq, ignoreNulls)
}
+ def getUniqueKeys(
+ join: CommonLookupJoin,
+ mq: RelMetadataQuery,
+ ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
+ val left = join.getInput
+ val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
+ val leftType = left.getRowType
+ getJoinUniqueKeys(
+ join.joinInfo, join.joinType, leftType, leftUniqueKeys, null,
+ mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
+ // TODO get uniqueKeys from TableSchema of TableSource
+ null,
+ mq)
+ }
+
private def getJoinUniqueKeys(
joinInfo: JoinInfo,
joinRelType: JoinRelType,
@@ -488,8 +503,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
retSet
}
- // TODO supports temporal table join
-
def getUniqueKeys(
rel: Correlate,
mq: RelMetadataQuery,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 8bc5cc2..1d269bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -84,7 +84,6 @@ class LookupJoinTest extends TableTestBase {
)
}
-
@Test
def testLogicalPlan(): Unit = {
val sql1 =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index bbb5ba3..6684ab8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -86,6 +86,13 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
}
@Test
+ def testGetColumnIntervalOnSnapshot(): Unit = {
+ (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+ assertNull(mq.getColumnInterval(flinkLogicalSnapshot, idx))
+ }
+ }
+
+ @Test
def testGetColumnIntervalOnProject(): Unit = {
assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalProject, 0))
assertNull(mq.getColumnInterval(logicalProject, 1))
@@ -525,6 +532,22 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
assertEquals(ValueInterval(8L, 1000L), mq.getColumnInterval(join, 6))
assertNull(mq.getColumnInterval(join, 7))
assertNull(mq.getColumnInterval(join, 8))
+
+ assertEquals(ValueInterval(0, null, includeLower = true),
+ mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 0))
+ assertEquals(ValueInterval(1L, 800000000L),
+ mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 1))
+ assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 2))
+ assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 3))
+ assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
+
+ assertEquals(ValueInterval(0, null, includeLower = true),
+ mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 0))
+ assertEquals(ValueInterval(1L, 800000000L),
+ mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 1))
+ assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 2))
+ assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 3))
+ assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
index 7eaf02e..e227a73 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
@@ -52,6 +52,13 @@ class FlinkRelMdColumnNullCountTest extends FlinkRelMdHandlerTestBase {
}
@Test
+ def testGetColumnIntervalOnSnapshot(): Unit = {
+ (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+ assertNull(mq.getColumnNullCount(flinkLogicalSnapshot, idx))
+ }
+ }
+
+ @Test
def testGetColumnNullCountOnProject(): Unit = {
assertEquals(0.0, mq.getColumnNullCount(logicalProject, 0))
assertEquals(0.0, mq.getColumnNullCount(logicalProject, 1))
@@ -262,6 +269,13 @@ class FlinkRelMdColumnNullCountTest extends FlinkRelMdHandlerTestBase {
(0 until fullJoin.getRowType.getFieldCount).foreach { idx =>
assertNull(mq.getColumnNullCount(fullJoin, idx))
}
+
+ // semi/anti join
+ Array(logicalSemiJoinWithEquiAndNonEquiCond, logicalAntiJoinWithoutEquiCond).foreach { join =>
+ (0 until join.getRowType.getFieldCount).foreach { idx =>
+ assertNull(mq.getColumnNullCount(fullJoin, idx))
+ }
+ }
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
index 40a11a9..1d6f86b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
@@ -46,6 +46,13 @@ class FlinkRelMdColumnOriginNullCountTest extends FlinkRelMdHandlerTestBase {
}
@Test
+ def testGetColumnOriginNullCountOnSnapshot(): Unit = {
+ (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+ assertNull(mq.getColumnOriginNullCount(flinkLogicalSnapshot, idx))
+ }
+ }
+
+ @Test
def testGetColumnOriginNullCountOnProject(): Unit = {
assertEquals(0.0, mq.getColumnOriginNullCount(logicalProject, 0))
assertEquals(0.0, mq.getColumnOriginNullCount(logicalProject, 1))
@@ -127,7 +134,8 @@ class FlinkRelMdColumnOriginNullCountTest extends FlinkRelMdHandlerTestBase {
assertEquals(0.0, mq.getColumnOriginNullCount(innerJoin2, 3))
Array(logicalLeftJoinOnUniqueKeys, logicalRightJoinNotOnUniqueKeys,
- logicalFullJoinWithEquiAndNonEquiCond).foreach { join =>
+ logicalFullJoinWithEquiAndNonEquiCond, logicalSemiJoinNotOnUniqueKeys,
+ logicalSemiJoinWithEquiAndNonEquiCond).foreach { join =>
(0 until join.getRowType.getFieldCount).foreach { idx =>
assertNull(mq.getColumnOriginNullCount(join, idx))
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
index 0bf706b..6929247 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
@@ -496,6 +496,43 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
assertFalse(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(1, 6)))
assertFalse(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(5, 6)))
assertTrue(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 5, 6)))
+
+ // semi/anti join
+ Array(logicalSemiJoinOnUniqueKeys, logicalSemiJoinNotOnUniqueKeys,
+ logicalSemiJoinOnDisjointKeys, logicalAntiJoinOnUniqueKeys, logicalAntiJoinNotOnUniqueKeys,
+ logicalAntiJoinOnDisjointKeys).foreach { join =>
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(0)))
+ assertTrue(mq.areColumnsUnique(join, ImmutableBitSet.of(1)))
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(2)))
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(3)))
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(4)))
+ assertTrue(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 1)))
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 2)))
+ }
+ }
+
+ @Test
+ def testAreColumnsUniqueOnLookupJoin(): Unit = {
+ Array(batchLookupJoin, streamLookupJoin).foreach { join =>
+ assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of()))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(2)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(3)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(4)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(5)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(6)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(7)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(8)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(9)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 1)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1, 2)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 7)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1, 7)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 8)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(7, 8)))
+ assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(8, 9)))
+ }
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
index ec7dda4..17a14e8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
@@ -587,6 +587,24 @@ class FlinkRelMdDistinctRowCountTest extends FlinkRelMdHandlerTestBase {
mq.getDistinctRowCount(logicalFullJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null))
assertEquals(505696447.06,
mq.getDistinctRowCount(logicalFullJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null), 1e-2)
+
+ assertEquals(50,
+ mq.getDistinctRowCount(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0), null), 1e-2)
+ assertEquals(50,
+ mq.getDistinctRowCount(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(1), null), 1e-2)
+ assertEquals(2.0E7,
+ mq.getDistinctRowCount(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null))
+ assertEquals(8.0E8,
+ mq.getDistinctRowCount(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null))
+
+ assertEquals(2.0E7,
+ mq.getDistinctRowCount(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0), null))
+ assertEquals(7.9999995E8,
+ mq.getDistinctRowCount(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(1), null))
+ assertEquals(1.970438234E7,
+ mq.getDistinctRowCount(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null), 1e-2)
+ assertEquals(8.0E7,
+ mq.getDistinctRowCount(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 0d07727..0661f03 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.plan.nodes.logical._
import org.apache.flink.table.plan.nodes.physical.batch._
import org.apache.flink.table.plan.nodes.physical.stream._
import org.apache.flink.table.plan.schema.FlinkRelOptTable
+import org.apache.flink.table.plan.stream.sql.join.TestTemporalTable
import org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList
import org.apache.flink.table.plan.util._
import org.apache.flink.table.planner.PlannerContext
@@ -50,18 +51,18 @@ import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel._
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
-import org.apache.calcite.rel.core._
-import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.core.{AggregateCall, Calc, JoinInfo, JoinRelType, Project, Window}
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject, LogicalSort, LogicalTableScan, LogicalValues}
import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery}
import org.apache.calcite.rex._
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.sql.SqlWindow
import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, BOOLEAN, DATE, DOUBLE, FLOAT, TIME, TIMESTAMP, VARCHAR}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{AND, CASE, DIVIDE, EQUALS, GREATER_THAN, LESS_THAN, MINUS, MULTIPLY, OR, PLUS}
import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.util.{DateString, ImmutableBitSet, TimeString, TimestampString}
+import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
import org.junit.{Before, BeforeClass}
import java.math.BigDecimal
@@ -1770,6 +1771,46 @@ class FlinkRelMdHandlerTestBase {
)
}
+ protected lazy val flinkLogicalSnapshot: FlinkLogicalSnapshot = {
+ new FlinkLogicalSnapshot(
+ cluster,
+ flinkLogicalTraits,
+ studentFlinkLogicalScan,
+ relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
+ }
+
+ // SELECT * FROM student AS T JOIN TemporalTable
+ // FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+ protected lazy val (batchLookupJoin, streamLookupJoin) = {
+ val temporalTableSource = new TestTemporalTable
+ val temporalTableRowType = typeFactory.builder()
+ .add("id", SqlTypeName.INTEGER)
+ .add("name", SqlTypeName.VARCHAR)
+ .add("age", SqlTypeName.INTEGER)
+ .build()
+ val batchLookupJoin = new BatchExecLookupJoin(
+ cluster,
+ batchPhysicalTraits,
+ studentBatchScan,
+ temporalTableSource,
+ temporalTableRowType,
+ None,
+ JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)),
+ JoinRelType.INNER
+ )
+ val streamLookupJoin = new StreamExecLookupJoin(
+ cluster,
+ streamPhysicalTraits,
+ studentBatchScan,
+ temporalTableSource,
+ temporalTableRowType,
+ None,
+ JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)),
+ JoinRelType.INNER
+ )
+ (batchLookupJoin, streamLookupJoin)
+ }
+
// select * from MyTable1 join MyTable4 on MyTable1.b = MyTable4.a
protected lazy val logicalInnerJoinOnUniqueKeys: RelNode = relBuilder
.scan("MyTable1")
@@ -2002,12 +2043,133 @@ class FlinkRelMdHandlerTestBase {
.build
// select * from MyTable1 full join MyTable2 on true
- protected lazy val logicalFullWithoutCond: RelNode = relBuilder
+ protected lazy val logicalFullJoinWithoutCond: RelNode = relBuilder
.scan("MyTable1")
.scan("MyTable2")
.join(JoinRelType.FULL, relBuilder.literal(true))
.build
+ // select * from MyTable1 b in (select a from MyTable4)
+ protected lazy val logicalSemiJoinOnUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable4")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 0)))
+ .build()
+
+ // select * from MyTable1 a in (select a from MyTable2)
+ protected lazy val logicalSemiJoinNotOnUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+ .build()
+
+ // select * from MyTable1 b in (select b from MyTable2)
+ protected lazy val logicalSemiJoinOnLHSUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+ .build()
+
+ // select * from MyTable2 a in (select b from MyTable1)
+ protected lazy val logicalSemiJoinOnRHSUniqueKeys: RelNode = relBuilder
+ .scan("MyTable2")
+ .scan("MyTable1")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+ .build()
+
+ // select * from MyTable1 b in (select b from MyTable2 where MyTable1.a > MyTable2.a)
+ protected lazy val logicalSemiJoinWithEquiAndNonEquiCond: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.SEMI, relBuilder.call(AND,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+ relBuilder.call(GREATER_THAN, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0))))
+ .build
+
+ // select * from MyTable1 exists (select * from MyTable2 where MyTable1.a > MyTable2.a)
+ protected lazy val logicalSemiJoinWithoutEquiCond: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(GREATER_THAN, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+ .build()
+
+ // select * from MyTable1 where e in (select e from MyTable2)
+ protected lazy val logicalSemiJoinOnDisjointKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.SEMI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 4), relBuilder.field(2, 1, 4)))
+ .build
+
+ // select * from MyTable1 not exists (select * from MyTable4 where MyTable1.b = MyTable4.a)
+ protected lazy val logicalAntiJoinOnUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable4")
+ .join(JoinRelType.ANTI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 0)))
+ .build()
+
+ // select * from MyTable1 not exists (select * from MyTable2 where MyTable1.a = MyTable2.a)
+ protected lazy val logicalAntiJoinNotOnUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.ANTI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+ .build()
+
+ // select * from MyTable1 not exists (select * from MyTable2 where MyTable1.b = MyTable2.b)
+ protected lazy val logicalAntiJoinOnLHSUniqueKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.ANTI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+ .build()
+
+ // select * from MyTable2 not exists (select * from MyTable1 where MyTable1.b = MyTable2.b)
+ protected lazy val logicalAntiJoinOnRHSUniqueKeys: RelNode = relBuilder
+ .scan("MyTable2")
+ .scan("MyTable1")
+ .join(JoinRelType.ANTI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+ .build()
+
+ // select * from MyTable1 b not in (select b from MyTable2 where MyTable1.a = MyTable2.a)
+ // notes: the nullable of b is true
+ protected lazy val logicalAntiJoinWithEquiAndNonEquiCond: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.ANTI, relBuilder.call(AND,
+ relBuilder.call(OR,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+ relBuilder.isNull(
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))),
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0))))
+ .build
+
+ // select * from MyTable1 b not in (select b from MyTable2)
+ // notes: the nullable of b is true
+ protected lazy val logicalAntiJoinWithoutEquiCond: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.ANTI, relBuilder.call(OR,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+ relBuilder.isNull(
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))))
+ .build
+
+ // select * from MyTable1 where not exists (select e from MyTable2 where MyTable1.e = MyTable2.e)
+ protected lazy val logicalAntiJoinOnDisjointKeys: RelNode = relBuilder
+ .scan("MyTable1")
+ .scan("MyTable2")
+ .join(JoinRelType.ANTI,
+ relBuilder.call(EQUALS, relBuilder.field(2, 0, 4), relBuilder.field(2, 1, 4)))
+ .build
+
// SELECT * FROM MyTable1 UNION ALL SELECT * MyTable2
protected lazy val logicalUnionAll: RelNode = relBuilder
.scan("MyTable1")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index 11a414c..c62a5c0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -281,6 +281,9 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
val join3 = relBuilder.push(left).push(right).join(JoinRelType.INNER,
relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 1))).build()
assertEquals(null, mq.getRelModifiedMonotonicity(join3))
+
+ assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinNotOnUniqueKeys))
+ assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys))
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
index 07604ed..ade834d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
@@ -69,6 +69,10 @@ class FlinkRelMdPercentageOriginalRowsTest extends FlinkRelMdHandlerTestBase {
assertEquals(1.0, mq.getPercentageOriginalRows(logicalRightJoinOnDisjointKeys))
assertEquals(1.0, mq.getPercentageOriginalRows(logicalFullJoinOnUniqueKeys))
assertEquals(1.0, mq.getPercentageOriginalRows(logicalFullJoinNotOnUniqueKeys))
+ assertEquals(1.0, mq.getPercentageOriginalRows(logicalSemiJoinOnUniqueKeys))
+ assertEquals(1.0, mq.getPercentageOriginalRows(logicalSemiJoinNotOnUniqueKeys))
+ assertEquals(1.0, mq.getPercentageOriginalRows(logicalAntiJoinOnUniqueKeys))
+ assertEquals(1.0, mq.getPercentageOriginalRows(logicalAntiJoinNotOnUniqueKeys))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
index 7147b16..d520178 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
@@ -319,6 +319,20 @@ class FlinkRelMdPopulationSizeTest extends FlinkRelMdHandlerTestBase {
mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(1, 5)))
assertEquals(5.112E10,
mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(0, 6)))
+
+ assertEquals(1.0, mq.getPopulationSize(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of()))
+ assertEquals(2.0E7, mq.getPopulationSize(logicalSemiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0)))
+ assertEquals(8.0E8, mq.getPopulationSize(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(1)))
+ assertEquals(8.0E8, mq.getPopulationSize(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1)))
+ assertEquals(8.0E8,
+ mq.getPopulationSize(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 2)))
+
+ assertEquals(1.0, mq.getPopulationSize(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of()))
+ assertEquals(2.0E7, mq.getPopulationSize(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0)))
+ assertEquals(8.0E8, mq.getPopulationSize(logicalAntiJoinOnLHSUniqueKeys, ImmutableBitSet.of(1)))
+ assertEquals(8.0E8, mq.getPopulationSize(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1)))
+ assertEquals(8.0E8,
+ mq.getPopulationSize(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 2)))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
index 61fec6d..0351b8b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
@@ -204,10 +204,27 @@ class FlinkRelMdRowCountTest extends FlinkRelMdHandlerTestBase {
assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnUniqueKeys))
assertEquals(8.0E8, mq.getRowCount(logicalFullJoinNotOnUniqueKeys))
+ assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnLHSUniqueKeys))
assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnRHSUniqueKeys))
assertEquals(8.1E8, mq.getRowCount(logicalFullJoinWithEquiAndNonEquiCond))
assertEquals(8.0E15, mq.getRowCount(logicalFullJoinWithoutEquiCond))
assertEquals(8.2E8, mq.getRowCount(logicalFullJoinOnDisjointKeys))
+
+ assertEquals(50.0, mq.getRowCount(logicalSemiJoinOnUniqueKeys))
+ assertEquals(8.0E8, mq.getRowCount(logicalSemiJoinNotOnUniqueKeys))
+ assertEquals(2556.0, mq.getRowCount(logicalSemiJoinOnLHSUniqueKeys))
+ assertEquals(2.0E7, mq.getRowCount(logicalSemiJoinOnRHSUniqueKeys))
+ assertEquals(1278.0, mq.getRowCount(logicalSemiJoinWithEquiAndNonEquiCond))
+ assertEquals(4.0E8, mq.getRowCount(logicalSemiJoinWithoutEquiCond))
+ assertEquals(8.0E8, mq.getRowCount(logicalSemiJoinOnDisjointKeys))
+
+ assertEquals(7.9999995E8, mq.getRowCount(logicalAntiJoinOnUniqueKeys))
+ assertEquals(8.0E7, mq.getRowCount(logicalAntiJoinNotOnUniqueKeys))
+ assertEquals(7.99997444E8, mq.getRowCount(logicalAntiJoinOnLHSUniqueKeys))
+ assertEquals(2000000.0, mq.getRowCount(logicalAntiJoinOnRHSUniqueKeys))
+ assertEquals(6.0E8, mq.getRowCount(logicalAntiJoinWithEquiAndNonEquiCond))
+ assertEquals(6.0E8, mq.getRowCount(logicalAntiJoinWithoutEquiCond))
+ assertEquals(8.0E7, mq.getRowCount(logicalAntiJoinOnDisjointKeys))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
index 9dfa3b5..c9b5f2f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
@@ -496,6 +496,18 @@ class FlinkRelMdSelectivityTest extends FlinkRelMdHandlerTestBase {
assertEquals(1D, mq.getSelectivity(join, pred3))
val pred4 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(3), relBuilder.literal(0))
assertEquals(0D, mq.getSelectivity(join, pred4))
+
+ assertEquals(3.125E-8, mq.getSelectivity(logicalSemiJoinOnUniqueKeys, pred1))
+ val pred5 = relBuilder.push(logicalSemiJoinNotOnUniqueKeys)
+ .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100000000L))
+ assertEquals(0.5, mq.getSelectivity(logicalSemiJoinNotOnUniqueKeys, pred5))
+
+ val pred6 = relBuilder.push(logicalAntiJoinWithoutEquiCond)
+ .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100L))
+ assertEquals(0.375, mq.getSelectivity(logicalAntiJoinWithoutEquiCond, pred6))
+ val pred7 = relBuilder.push(logicalAntiJoinNotOnUniqueKeys)
+ .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100000000L))
+ assertEquals(0.05, mq.getSelectivity(logicalAntiJoinNotOnUniqueKeys, pred7))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
index c94c907..134ab19 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
@@ -171,6 +171,11 @@ class FlinkRelMdSizeTest extends FlinkRelMdHandlerTestBase {
assertEquals(Seq(4.0, 8.0, 12.0, 88.8, 4.0, 4.0, 8.0, 12.0, 10.52, 4.0),
mq.getAverageColumnSizes(join).toList)
}
+
+ Array(logicalSemiJoinOnUniqueKeys, logicalAntiJoinNotOnUniqueKeys).foreach { join =>
+ assertEquals(Seq(4.0, 8.0, 12.0, 88.8, 4.0),
+ mq.getAverageColumnSizes(join).toList)
+ }
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
index 8417023..9bbea62 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
@@ -578,7 +578,50 @@ class FlinkRelMdUniqueGroupsTest extends FlinkRelMdHandlerTestBase {
// without equi join condition
assertEquals(ImmutableBitSet.of(1, 5, 6, 7, 8, 9),
- mq.getUniqueGroups(logicalFullWithoutCond, ImmutableBitSet.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)))
+ mq.getUniqueGroups(logicalFullJoinWithoutCond,
+ ImmutableBitSet.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)))
+
+ // semi join
+ // both left join keys and right join keys are unique
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // left join keys are not unique and right join keys are unique
+ assertEquals(ImmutableBitSet.of(0, 1, 2, 3, 4),
+ mq.getUniqueGroups(logicalSemiJoinOnRHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // left join keys are unique and right join keys are not unique
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalSemiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // neither left join keys nor right join keys are unique (non join columns have unique columns)
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // with non-equi join condition
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalSemiJoinWithEquiAndNonEquiCond, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // anti join
+ // both left join keys and right join keys are unique
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // left join keys are not unique and right join keys are unique
+ assertEquals(ImmutableBitSet.of(0, 1, 2, 3, 4),
+ mq.getUniqueGroups(logicalAntiJoinOnRHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // left join keys are unique and right join keys are not unique
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalAntiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // neither left join keys nor right join keys are unique (non join columns have unique columns)
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+ // with non-equi join condition
+ assertEquals(ImmutableBitSet.of(1),
+ mq.getUniqueGroups(logicalAntiJoinWithEquiAndNonEquiCond, ImmutableBitSet.of(0, 1, 2, 3, 4)))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 9e034a6..627599c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -226,6 +226,29 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnRHSUniqueKeys).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithoutEquiCond).toSet)
assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithEquiAndNonEquiCond).toSet)
+
+ assertEquals(uniqueKeys(Array(1)),
+ mq.getUniqueKeys(logicalSemiJoinOnUniqueKeys).toSet)
+ assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalSemiJoinNotOnUniqueKeys).toSet)
+ assertNull(mq.getUniqueKeys(logicalSemiJoinOnRHSUniqueKeys))
+ assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalSemiJoinWithoutEquiCond).toSet)
+ assertEquals(uniqueKeys(Array(1)),
+ mq.getUniqueKeys(logicalSemiJoinWithEquiAndNonEquiCond).toSet)
+
+ assertEquals(uniqueKeys(Array(1)),
+ mq.getUniqueKeys(logicalAntiJoinOnUniqueKeys).toSet)
+ assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalAntiJoinNotOnUniqueKeys).toSet)
+ assertNull(mq.getUniqueKeys(logicalAntiJoinOnRHSUniqueKeys))
+ assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalAntiJoinWithoutEquiCond).toSet)
+ assertEquals(uniqueKeys(Array(1)),
+ mq.getUniqueKeys(logicalAntiJoinWithEquiAndNonEquiCond).toSet)
+ }
+
+ @Test
+ def testGetUniqueKeysOnLookupJoin(): Unit = {
+ Array(batchLookupJoin, streamLookupJoin).foreach { join =>
+ assertEquals(uniqueKeys(), mq.getUniqueKeys(join).toSet)
+ }
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
index c63f299..bf7b3a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
@@ -179,6 +179,7 @@ object MetadataTestUtil {
new IntType(),
new TimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
+ val fieldNulls = fieldNames.map(_ => true)
val colStatsMap = Map[String, ColumnStats](
"a" -> new ColumnStats(30L, 0L, 4D, 4, 45, 5),
@@ -187,7 +188,7 @@ object MetadataTestUtil {
)
val tableStats = new TableStats(50L, colStatsMap)
- getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats),
+ getDataStreamTable(fieldNames, fieldTypes, fieldNulls, new FlinkStatistic(tableStats),
producesUpdates = false, isAccRetract = false)
}
@@ -199,6 +200,7 @@ object MetadataTestUtil {
new IntType(),
new TimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
+ val fieldNulls = fieldNames.map(_ => true)
val colStatsMap = Map[String, ColumnStats](
"a" -> new ColumnStats(50L, 0L, 8D, 8, 55, 5),
@@ -208,8 +210,8 @@ object MetadataTestUtil {
val tableStats = new TableStats(50L, colStatsMap)
val uniqueKeys = Set(Set("a").asJava).asJava
- getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats, uniqueKeys),
- producesUpdates = false, isAccRetract = false)
+ getDataStreamTable(fieldNames, fieldTypes, fieldNulls,
+ new FlinkStatistic(tableStats, uniqueKeys), producesUpdates = false, isAccRetract = false)
}
private def createTemporalTable3(): DataStreamTable[BaseRow] = {
@@ -220,6 +222,7 @@ object MetadataTestUtil {
new VarCharType(VarCharType.MAX_LENGTH),
new TimestampType(true, TimestampKind.PROCTIME, 3),
new TimestampType(true, TimestampKind.ROWTIME, 3))
+ val fieldNulls = fieldNames.map(_ => true)
val colStatsMap = Map[String, ColumnStats](
"a" -> new ColumnStats(3740000000L, 0L, 4D, 4, null, null),
@@ -228,7 +231,7 @@ object MetadataTestUtil {
)
val tableStats = new TableStats(4000000000L, colStatsMap)
- getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats),
+ getDataStreamTable(fieldNames, fieldTypes, fieldNulls, new FlinkStatistic(tableStats),
producesUpdates = false, isAccRetract = false)
}
@@ -237,13 +240,16 @@ object MetadataTestUtil {
statistic: FlinkStatistic,
producesUpdates: Boolean = false,
isAccRetract: Boolean = false): DataStreamTable[BaseRow] = {
+ val names = tableSchema.getFieldNames
val types = tableSchema.getFieldTypes.map(fromTypeInfoToLogicalType)
- getDataStreamTable(tableSchema.getFieldNames, types, statistic, producesUpdates, isAccRetract)
+ val nulls = Array.fill(tableSchema.getFieldCount)(true)
+ getDataStreamTable(names, types, nulls, statistic, producesUpdates, isAccRetract)
}
private def getDataStreamTable(
fieldNames: Array[String],
fieldTypes: Array[LogicalType],
+ fieldNullables: Array[Boolean],
statistic: FlinkStatistic,
producesUpdates: Boolean,
isAccRetract: Boolean): DataStreamTable[BaseRow] = {
@@ -256,7 +262,8 @@ object MetadataTestUtil {
isAccRetract,
fieldTypes.indices.toArray,
fieldNames,
- statistic)
+ statistic,
+ Some(fieldNullables))
}
}