You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/02 08:09:01 UTC

[flink] branch master updated: [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0119afa  [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join
0119afa is described below

commit 0119afaf28da2cf39e772b4fb568812ad09cfa79
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jun 1 17:44:31 2019 +0800

    [FLINK-12703][table-planner-blink] Introduce metadata handlers on SEMI/ANTI join and lookup join
    
    This closes #8588
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |  10 ++
 .../plan/metadata/FlinkRelMdColumnNullCount.scala  |  10 ++
 .../metadata/FlinkRelMdColumnOriginNullCount.scala |   4 +-
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  18 ++-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  13 +-
 .../flink/table/plan/metadata/FlinkRelMdSize.scala |   7 +-
 .../plan/metadata/FlinkRelMdUniqueGroups.scala     |   8 +-
 .../table/plan/metadata/FlinkRelMdUniqueKeys.scala |  19 ++-
 .../table/plan/batch/sql/join/LookupJoinTest.scala |   1 -
 .../metadata/FlinkRelMdColumnIntervalTest.scala    |  23 +++
 .../metadata/FlinkRelMdColumnNullCountTest.scala   |  14 ++
 .../FlinkRelMdColumnOriginNullCountTest.scala      |  10 +-
 .../metadata/FlinkRelMdColumnUniquenessTest.scala  |  37 +++++
 .../metadata/FlinkRelMdDistinctRowCountTest.scala  |  18 +++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 174 ++++++++++++++++++++-
 .../FlinkRelMdModifiedMonotonicityTest.scala       |   3 +
 .../FlinkRelMdPercentageOriginalRowsTest.scala     |   4 +
 .../metadata/FlinkRelMdPopulationSizeTest.scala    |  14 ++
 .../plan/metadata/FlinkRelMdRowCountTest.scala     |  17 ++
 .../plan/metadata/FlinkRelMdSelectivityTest.scala  |  12 ++
 .../table/plan/metadata/FlinkRelMdSizeTest.scala   |   5 +
 .../plan/metadata/FlinkRelMdUniqueGroupsTest.scala |  45 +++++-
 .../plan/metadata/FlinkRelMdUniqueKeysTest.scala   |  23 +++
 .../table/plan/metadata/MetadataTestUtil.scala     |  19 ++-
 24 files changed, 476 insertions(+), 32 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
index 20d4610..ed13607 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -101,6 +101,16 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
   }
 
   /**
+    * Gets interval of the given column on Snapshot.
+    *
+    * @param snapshot    Snapshot RelNode
+    * @param mq    RelMetadataQuery instance
+    * @param index the index of the given column
+    * @return interval of the given column on Snapshot.
+    */
+  def getColumnInterval(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): ValueInterval = null
+
+  /**
     * Gets interval of the given column on Project.
     *
     * Note: Only support the simple RexNode, e.g RexInputRef.
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
index ec7a59c..673f1cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCount.scala
@@ -68,6 +68,16 @@ class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount]
   }
 
   /**
+    * Gets the null count of the given column on Snapshot.
+    *
+    * @param snapshot    Snapshot RelNode
+    * @param mq    RelMetadataQuery instance
+    * @param index the index of the given column
+    * @return the null count of the given column on Snapshot.
+    */
+  def getColumnNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): JDouble = null
+
+  /**
     * Gets the null count of the given column in Project.
     *
     * @param project Project RelNode
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
index 6b88b9b..61bb21b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCount.scala
@@ -58,6 +58,8 @@ class FlinkRelMdColumnOriginNullCount private extends MetadataHandler[ColumnOrig
     }
   }
 
+  def getColumnOriginNullCount(snapshot: Snapshot, mq: RelMetadataQuery, index: Int): JDouble = null
+
   def getColumnOriginNullCount(rel: Project, mq: RelMetadataQuery, index: Int): JDouble = {
     getColumnOriginNullOnProjects(rel.getInput, rel.getProjects, mq, index)
   }
@@ -119,8 +121,6 @@ class FlinkRelMdColumnOriginNullCount private extends MetadataHandler[ColumnOrig
     }
   }
 
-  // TODO supports FlinkLogicalSnapshot
-
   def getColumnOriginNullCount(rel: RelNode, mq: RelMetadataQuery, index: Int): JDouble = null
 }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
index e6a2686..0afaa62 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.plan.nodes.FlinkRelNode
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
+import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
 import org.apache.flink.table.plan.nodes.logical._
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
@@ -498,6 +499,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
     )
   }
 
+  def areColumnsUnique(
+      join: CommonLookupJoin,
+      mq: RelMetadataQuery,
+      columns: ImmutableBitSet,
+      ignoreNulls: Boolean): JBoolean = {
+    val left = join.getInput
+    areColumnsUniqueOfJoin(
+      join.joinInfo, join.joinType, left.getRowType,
+      (leftSet: ImmutableBitSet) => mq.areColumnsUnique(left, leftSet, ignoreNulls),
+      // TODO get uniqueKeys from TableSchema of TableSource
+      (_: ImmutableBitSet) => null,
+      mq, columns
+    )
+  }
+
   def areColumnsUniqueOfJoin(
       joinInfo: JoinInfo,
       joinRelType: JoinRelType,
@@ -598,8 +614,6 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = null
 
-  // TODO supports temporal table join
-
   def areColumnsUnique(
       rel: SetOp,
       mq: RelMetadataQuery,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 914768e..3bba1e1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -315,7 +315,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
           val childMono = inputMonotonicity.fieldMonotonicities(aggCall.getArgList.head)
           val currentMono = fieldMonotonicities(index)
           if (childMono != currentMono &&
-            !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
+              !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
             // count will Increasing even child is NOT_MONOTONIC
             fieldMonotonicities(index) = NOT_MONOTONIC
           }
@@ -369,12 +369,16 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
   }
 
   def getRelModifiedMonotonicity(rel: Join, mq: RelMetadataQuery): RelModifiedMonotonicity = {
+    val joinType = rel.getJoinType
+    if (joinType.equals(JoinRelType.ANTI)) {
+      return null
+    }
+
     val left = rel.getLeft
     val right = rel.getRight
     val joinInfo = rel.analyzeCondition
     val leftKeys = joinInfo.leftKeys
     val rightKeys = joinInfo.rightKeys
-    val joinType = rel.getJoinType
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
 
     // if group set contains update return null
@@ -389,8 +393,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
     val isKeyAllAppend = isAllConstantOnKeys(left, leftKeys.toIntArray) &&
       isAllConstantOnKeys(right, rightKeys.toIntArray)
 
-    if (!containDelete && !joinType.equals(JoinRelType.ANTI) && isKeyAllAppend &&
-      (containUpdate && joinInfo.isEqui || !containUpdate)) {
+    if (!containDelete && isKeyAllAppend && (containUpdate && joinInfo.isEqui || !containUpdate)) {
 
       // output rowtype of semi equals to the rowtype of left child
       if (joinType.equals(JoinRelType.SEMI)) {
@@ -426,7 +429,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon
     getMonotonicity(rel.getInput(0), mq, rel.getRowType.getFieldCount)
   }
 
-  // TODO supports temporal table join
+  // TODO supports temporal table function join
 
   def getRelModifiedMonotonicity(rel: Union, mq: RelMetadataQuery): RelModifiedMonotonicity = {
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
index 5cd6ff3..3a68f7e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSize.scala
@@ -245,10 +245,9 @@ class FlinkRelMdSize private extends MetadataHandler[BuiltInMetadata.Size] {
 
   def averageColumnSizes(rel: Join, mq: RelMetadataQuery): JList[JDouble] = {
     val acsOfLeft = mq.getAverageColumnSizes(rel.getLeft)
-    val acsOfRight = if (rel.getJoinType.projectsRight) {
-      mq.getAverageColumnSizes(rel.getRight)
-    } else {
-      null
+    val acsOfRight = rel.getJoinType match {
+      case JoinRelType.SEMI | JoinRelType.ANTI => null
+      case _ => mq.getAverageColumnSizes(rel.getRight)
     }
     if (acsOfLeft == null && acsOfRight == null) {
       null
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
index 73a3adb..62ab472 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -341,11 +341,17 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] {
       mq: RelMetadataQuery,
       columns: ImmutableBitSet): ImmutableBitSet = {
     require(join.getSystemFieldList.isEmpty)
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    join.getJoinType match {
+      case JoinRelType.SEMI | JoinRelType.ANTI =>
+        return fmq.getUniqueGroups(join.getLeft, columns)
+      case _ => // do nothing
+    }
+
     val leftFieldCount = join.getLeft.getRowType.getFieldCount
     val (leftColumns, rightColumns) =
       FlinkRelMdUtil.splitColumnsIntoLeftAndRight(leftFieldCount, columns)
 
-    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
     val leftUniqueGroups = fmq.getUniqueGroups(join.getLeft, leftColumns)
     val rightUniqueGroups = fmq.getUniqueGroups(join.getRight, rightColumns)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
index 956c45d..84d66cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.metadata
 
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
+import org.apache.flink.table.plan.nodes.common.CommonLookupJoin
 import org.apache.flink.table.plan.nodes.logical._
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
@@ -49,7 +50,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
 
   def getDef: MetadataDef[BuiltInMetadata.UniqueKeys] = BuiltInMetadata.UniqueKeys.DEF
 
-
   def getUniqueKeys(
       rel: TableScan,
       mq: RelMetadataQuery,
@@ -410,6 +410,21 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
     getJoinUniqueKeys(joinInfo, rel.joinType, rel.getLeft, rel.getRight, mq, ignoreNulls)
   }
 
+  def getUniqueKeys(
+      join: CommonLookupJoin,
+      mq: RelMetadataQuery,
+      ignoreNulls: Boolean): util.Set[ImmutableBitSet] = {
+    val left = join.getInput
+    val leftUniqueKeys = mq.getUniqueKeys(left, ignoreNulls)
+    val leftType = left.getRowType
+    getJoinUniqueKeys(
+      join.joinInfo, join.joinType, leftType, leftUniqueKeys, null,
+      mq.areColumnsUnique(left, join.joinInfo.leftSet, ignoreNulls),
+      // TODO get uniqueKeys from TableSchema of TableSource
+      null,
+      mq)
+  }
+
   private def getJoinUniqueKeys(
       joinInfo: JoinInfo,
       joinRelType: JoinRelType,
@@ -488,8 +503,6 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
     retSet
   }
 
-  // TODO supports temporal table join
-
   def getUniqueKeys(
       rel: Correlate,
       mq: RelMetadataQuery,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 8bc5cc2..1d269bf 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -84,7 +84,6 @@ class LookupJoinTest extends TableTestBase {
     )
   }
 
-
   @Test
   def testLogicalPlan(): Unit = {
     val sql1 =
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index bbb5ba3..6684ab8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -86,6 +86,13 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   }
 
   @Test
+  def testGetColumnIntervalOnSnapshot(): Unit = {
+    (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+      assertNull(mq.getColumnInterval(flinkLogicalSnapshot, idx))
+    }
+  }
+
+  @Test
   def testGetColumnIntervalOnProject(): Unit = {
     assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalProject, 0))
     assertNull(mq.getColumnInterval(logicalProject, 1))
@@ -525,6 +532,22 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     assertEquals(ValueInterval(8L, 1000L), mq.getColumnInterval(join, 6))
     assertNull(mq.getColumnInterval(join, 7))
     assertNull(mq.getColumnInterval(join, 8))
+
+    assertEquals(ValueInterval(0, null, includeLower = true),
+      mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 0))
+    assertEquals(ValueInterval(1L, 800000000L),
+      mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 1))
+    assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 2))
+    assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 3))
+    assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
+
+    assertEquals(ValueInterval(0, null, includeLower = true),
+      mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 0))
+    assertEquals(ValueInterval(1L, 800000000L),
+      mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 1))
+    assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 2))
+    assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 3))
+    assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
index 7eaf02e..e227a73 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnNullCountTest.scala
@@ -52,6 +52,13 @@ class FlinkRelMdColumnNullCountTest extends FlinkRelMdHandlerTestBase {
   }
 
   @Test
+  def testGetColumnIntervalOnSnapshot(): Unit = {
+    (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+      assertNull(mq.getColumnNullCount(flinkLogicalSnapshot, idx))
+    }
+  }
+
+  @Test
   def testGetColumnNullCountOnProject(): Unit = {
     assertEquals(0.0, mq.getColumnNullCount(logicalProject, 0))
     assertEquals(0.0, mq.getColumnNullCount(logicalProject, 1))
@@ -262,6 +269,13 @@ class FlinkRelMdColumnNullCountTest extends FlinkRelMdHandlerTestBase {
     (0 until fullJoin.getRowType.getFieldCount).foreach { idx =>
       assertNull(mq.getColumnNullCount(fullJoin, idx))
     }
+
+    // semi/anti join
+    Array(logicalSemiJoinWithEquiAndNonEquiCond, logicalAntiJoinWithoutEquiCond).foreach { join =>
+      (0 until join.getRowType.getFieldCount).foreach { idx =>
+        assertNull(mq.getColumnNullCount(fullJoin, idx))
+      }
+    }
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
index 40a11a9..1d6f86b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnOriginNullCountTest.scala
@@ -46,6 +46,13 @@ class FlinkRelMdColumnOriginNullCountTest extends FlinkRelMdHandlerTestBase {
   }
 
   @Test
+  def testGetColumnOriginNullCountOnSnapshot(): Unit = {
+    (0 until flinkLogicalSnapshot.getRowType.getFieldCount).foreach { idx =>
+      assertNull(mq.getColumnOriginNullCount(flinkLogicalSnapshot, idx))
+    }
+  }
+
+  @Test
   def testGetColumnOriginNullCountOnProject(): Unit = {
     assertEquals(0.0, mq.getColumnOriginNullCount(logicalProject, 0))
     assertEquals(0.0, mq.getColumnOriginNullCount(logicalProject, 1))
@@ -127,7 +134,8 @@ class FlinkRelMdColumnOriginNullCountTest extends FlinkRelMdHandlerTestBase {
     assertEquals(0.0, mq.getColumnOriginNullCount(innerJoin2, 3))
 
     Array(logicalLeftJoinOnUniqueKeys, logicalRightJoinNotOnUniqueKeys,
-      logicalFullJoinWithEquiAndNonEquiCond).foreach { join =>
+      logicalFullJoinWithEquiAndNonEquiCond, logicalSemiJoinNotOnUniqueKeys,
+      logicalSemiJoinWithEquiAndNonEquiCond).foreach { join =>
       (0 until join.getRowType.getFieldCount).foreach { idx =>
         assertNull(mq.getColumnOriginNullCount(join, idx))
       }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
index 0bf706b..6929247 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
@@ -496,6 +496,43 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
     assertFalse(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(1, 6)))
     assertFalse(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(5, 6)))
     assertTrue(mq.areColumnsUnique(logicalFullJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 5, 6)))
+
+    // semi/anti join
+    Array(logicalSemiJoinOnUniqueKeys, logicalSemiJoinNotOnUniqueKeys,
+      logicalSemiJoinOnDisjointKeys, logicalAntiJoinOnUniqueKeys, logicalAntiJoinNotOnUniqueKeys,
+      logicalAntiJoinOnDisjointKeys).foreach { join =>
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(0)))
+      assertTrue(mq.areColumnsUnique(join, ImmutableBitSet.of(1)))
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(2)))
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(3)))
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(4)))
+      assertTrue(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 1)))
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 2)))
+    }
+  }
+
+  @Test
+  def testAreColumnsUniqueOnLookupJoin(): Unit = {
+    Array(batchLookupJoin, streamLookupJoin).foreach { join =>
+      assertFalse(mq.areColumnsUnique(join, ImmutableBitSet.of()))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(2)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(3)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(4)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(5)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(6)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(7)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(8)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(9)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 1)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1, 2)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 7)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(1, 7)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(0, 8)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(7, 8)))
+      assertNull(mq.areColumnsUnique(join, ImmutableBitSet.of(8, 9)))
+    }
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
index ec7dda4..17a14e8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdDistinctRowCountTest.scala
@@ -587,6 +587,24 @@ class FlinkRelMdDistinctRowCountTest extends FlinkRelMdHandlerTestBase {
       mq.getDistinctRowCount(logicalFullJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null))
     assertEquals(505696447.06,
       mq.getDistinctRowCount(logicalFullJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null), 1e-2)
+
+    assertEquals(50,
+      mq.getDistinctRowCount(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0), null), 1e-2)
+    assertEquals(50,
+      mq.getDistinctRowCount(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(1), null), 1e-2)
+    assertEquals(2.0E7,
+      mq.getDistinctRowCount(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null))
+    assertEquals(8.0E8,
+      mq.getDistinctRowCount(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null))
+
+    assertEquals(2.0E7,
+      mq.getDistinctRowCount(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0), null))
+    assertEquals(7.9999995E8,
+      mq.getDistinctRowCount(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(1), null))
+    assertEquals(1.970438234E7,
+      mq.getDistinctRowCount(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0), null), 1e-2)
+    assertEquals(8.0E7,
+      mq.getDistinctRowCount(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(1), null))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 0d07727..0661f03 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.plan.nodes.logical._
 import org.apache.flink.table.plan.nodes.physical.batch._
 import org.apache.flink.table.plan.nodes.physical.stream._
 import org.apache.flink.table.plan.schema.FlinkRelOptTable
+import org.apache.flink.table.plan.stream.sql.join.TestTemporalTable
 import org.apache.flink.table.plan.util.AggregateUtil.transformToStreamAggregateInfoList
 import org.apache.flink.table.plan.util._
 import org.apache.flink.table.planner.PlannerContext
@@ -50,18 +51,18 @@ import org.apache.calcite.plan._
 import org.apache.calcite.prepare.CalciteCatalogReader
 import org.apache.calcite.rel._
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
-import org.apache.calcite.rel.core._
-import org.apache.calcite.rel.logical._
+import org.apache.calcite.rel.core.{AggregateCall, Calc, JoinInfo, JoinRelType, Project, Window}
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject, LogicalSort, LogicalTableScan, LogicalValues}
 import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQuery}
 import org.apache.calcite.rex._
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.sql.SqlWindow
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun.SqlStdOperatorTable._
+import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, BOOLEAN, DATE, DOUBLE, FLOAT, TIME, TIMESTAMP, VARCHAR}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{AND, CASE, DIVIDE, EQUALS, GREATER_THAN, LESS_THAN, MINUS, MULTIPLY, OR, PLUS}
 import org.apache.calcite.sql.fun.{SqlCountAggFunction, SqlStdOperatorTable}
 import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.util.{DateString, ImmutableBitSet, TimeString, TimestampString}
+import org.apache.calcite.util.{DateString, ImmutableBitSet, ImmutableIntList, TimeString, TimestampString}
 import org.junit.{Before, BeforeClass}
 
 import java.math.BigDecimal
@@ -1770,6 +1771,46 @@ class FlinkRelMdHandlerTestBase {
     )
   }
 
+  protected lazy val flinkLogicalSnapshot: FlinkLogicalSnapshot = {
+    new FlinkLogicalSnapshot(
+      cluster,
+      flinkLogicalTraits,
+      studentFlinkLogicalScan,
+      relBuilder.call(FlinkSqlOperatorTable.PROCTIME))
+  }
+
+  // SELECT * FROM student AS T JOIN TemporalTable
+  // FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+  protected lazy val (batchLookupJoin, streamLookupJoin) = {
+    val temporalTableSource = new TestTemporalTable
+    val temporalTableRowType = typeFactory.builder()
+        .add("id", SqlTypeName.INTEGER)
+        .add("name", SqlTypeName.VARCHAR)
+        .add("age", SqlTypeName.INTEGER)
+        .build()
+    val batchLookupJoin = new BatchExecLookupJoin(
+      cluster,
+      batchPhysicalTraits,
+      studentBatchScan,
+      temporalTableSource,
+      temporalTableRowType,
+      None,
+      JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)),
+      JoinRelType.INNER
+    )
+    val streamLookupJoin = new StreamExecLookupJoin(
+      cluster,
+      streamPhysicalTraits,
+      studentBatchScan,
+      temporalTableSource,
+      temporalTableRowType,
+      None,
+      JoinInfo.of(ImmutableIntList.of(0), ImmutableIntList.of(0)),
+      JoinRelType.INNER
+    )
+    (batchLookupJoin, streamLookupJoin)
+  }
+
   // select * from MyTable1 join MyTable4 on MyTable1.b = MyTable4.a
   protected lazy val logicalInnerJoinOnUniqueKeys: RelNode = relBuilder
     .scan("MyTable1")
@@ -2002,12 +2043,133 @@ class FlinkRelMdHandlerTestBase {
     .build
 
   // select * from MyTable1 full join MyTable2 on true
-  protected lazy val logicalFullWithoutCond: RelNode = relBuilder
+  protected lazy val logicalFullJoinWithoutCond: RelNode = relBuilder
     .scan("MyTable1")
     .scan("MyTable2")
     .join(JoinRelType.FULL, relBuilder.literal(true))
     .build
 
+  // select * from MyTable1 b in (select a from MyTable4)
+  protected lazy val logicalSemiJoinOnUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable4")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 0)))
+    .build()
+
+  // select * from MyTable1 a in (select a from MyTable2)
+  protected lazy val logicalSemiJoinNotOnUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+    .build()
+
+  // select * from MyTable1 b in (select b from MyTable2)
+  protected lazy val logicalSemiJoinOnLHSUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+    .build()
+
+  // select * from MyTable2 a in (select b from MyTable1)
+  protected lazy val logicalSemiJoinOnRHSUniqueKeys: RelNode = relBuilder
+    .scan("MyTable2")
+    .scan("MyTable1")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+    .build()
+
+  // select * from MyTable1 b in (select b from MyTable2 where MyTable1.a > MyTable2.a)
+  protected lazy val logicalSemiJoinWithEquiAndNonEquiCond: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.SEMI, relBuilder.call(AND,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+      relBuilder.call(GREATER_THAN, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0))))
+    .build
+
+  // select * from MyTable1 exists (select * from MyTable2 where MyTable1.a > MyTable2.a)
+  protected lazy val logicalSemiJoinWithoutEquiCond: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(GREATER_THAN, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+    .build()
+
+  // select * from MyTable1 where e in (select e from MyTable2)
+  protected lazy val logicalSemiJoinOnDisjointKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.SEMI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 4), relBuilder.field(2, 1, 4)))
+    .build
+
+  // select * from MyTable1 not exists (select * from MyTable4 where MyTable1.b = MyTable4.a)
+  protected lazy val logicalAntiJoinOnUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable4")
+    .join(JoinRelType.ANTI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 0)))
+    .build()
+
+  // select * from MyTable1 not exists (select * from MyTable2 where MyTable1.a = MyTable2.a)
+  protected lazy val logicalAntiJoinNotOnUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.ANTI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0)))
+    .build()
+
+  // select * from MyTable1 not exists (select * from MyTable2 where MyTable1.b = MyTable2.b)
+  protected lazy val logicalAntiJoinOnLHSUniqueKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.ANTI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+    .build()
+
+  // select * from MyTable2 not exists (select * from MyTable1 where MyTable1.b = MyTable2.b)
+  protected lazy val logicalAntiJoinOnRHSUniqueKeys: RelNode = relBuilder
+    .scan("MyTable2")
+    .scan("MyTable1")
+    .join(JoinRelType.ANTI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))
+    .build()
+
+  // select * from MyTable1 b not in (select b from MyTable2 where MyTable1.a = MyTable2.a)
+  // notes: the nullable of b is true
+  protected lazy val logicalAntiJoinWithEquiAndNonEquiCond: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.ANTI, relBuilder.call(AND,
+      relBuilder.call(OR,
+        relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+        relBuilder.isNull(
+          relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))),
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 0))))
+    .build
+
+  // select * from MyTable1 b not in (select b from MyTable2)
+  // notes: the nullable of b is true
+  protected lazy val logicalAntiJoinWithoutEquiCond: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.ANTI, relBuilder.call(OR,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)),
+      relBuilder.isNull(
+        relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 1, 1)))))
+    .build
+
+  // select * from MyTable1 where not exists (select e from MyTable2 where MyTable1.e = MyTable2.e)
+  protected lazy val logicalAntiJoinOnDisjointKeys: RelNode = relBuilder
+    .scan("MyTable1")
+    .scan("MyTable2")
+    .join(JoinRelType.ANTI,
+      relBuilder.call(EQUALS, relBuilder.field(2, 0, 4), relBuilder.field(2, 1, 4)))
+    .build
+
   // SELECT * FROM MyTable1 UNION ALL SELECT * MyTable2
   protected lazy val logicalUnionAll: RelNode = relBuilder
     .scan("MyTable1")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
index 11a414c..c62a5c0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdModifiedMonotonicityTest.scala
@@ -281,6 +281,9 @@ class FlinkRelMdModifiedMonotonicityTest extends FlinkRelMdHandlerTestBase {
     val join3 = relBuilder.push(left).push(right).join(JoinRelType.INNER,
       relBuilder.call(EQUALS, relBuilder.field(2, 0, 0), relBuilder.field(2, 1, 1))).build()
     assertEquals(null, mq.getRelModifiedMonotonicity(join3))
+
+    assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinNotOnUniqueKeys))
+    assertNull(mq.getRelModifiedMonotonicity(logicalAntiJoinOnUniqueKeys))
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
index 07604ed..ade834d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPercentageOriginalRowsTest.scala
@@ -69,6 +69,10 @@ class FlinkRelMdPercentageOriginalRowsTest extends FlinkRelMdHandlerTestBase {
     assertEquals(1.0, mq.getPercentageOriginalRows(logicalRightJoinOnDisjointKeys))
     assertEquals(1.0, mq.getPercentageOriginalRows(logicalFullJoinOnUniqueKeys))
     assertEquals(1.0, mq.getPercentageOriginalRows(logicalFullJoinNotOnUniqueKeys))
+    assertEquals(1.0, mq.getPercentageOriginalRows(logicalSemiJoinOnUniqueKeys))
+    assertEquals(1.0, mq.getPercentageOriginalRows(logicalSemiJoinNotOnUniqueKeys))
+    assertEquals(1.0, mq.getPercentageOriginalRows(logicalAntiJoinOnUniqueKeys))
+    assertEquals(1.0, mq.getPercentageOriginalRows(logicalAntiJoinNotOnUniqueKeys))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
index 7147b16..d520178 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdPopulationSizeTest.scala
@@ -319,6 +319,20 @@ class FlinkRelMdPopulationSizeTest extends FlinkRelMdHandlerTestBase {
       mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(1, 5)))
     assertEquals(5.112E10,
       mq.getPopulationSize(logicalFullJoinWithoutEquiCond, ImmutableBitSet.of(0, 6)))
+
+    assertEquals(1.0, mq.getPopulationSize(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of()))
+    assertEquals(2.0E7, mq.getPopulationSize(logicalSemiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0)))
+    assertEquals(8.0E8, mq.getPopulationSize(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(1)))
+    assertEquals(8.0E8, mq.getPopulationSize(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1)))
+    assertEquals(8.0E8,
+      mq.getPopulationSize(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 2)))
+
+    assertEquals(1.0, mq.getPopulationSize(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of()))
+    assertEquals(2.0E7, mq.getPopulationSize(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0)))
+    assertEquals(8.0E8, mq.getPopulationSize(logicalAntiJoinOnLHSUniqueKeys, ImmutableBitSet.of(1)))
+    assertEquals(8.0E8, mq.getPopulationSize(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1)))
+    assertEquals(8.0E8,
+      mq.getPopulationSize(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 2)))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
index 61fec6d..0351b8b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdRowCountTest.scala
@@ -204,10 +204,27 @@ class FlinkRelMdRowCountTest extends FlinkRelMdHandlerTestBase {
 
     assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnUniqueKeys))
     assertEquals(8.0E8, mq.getRowCount(logicalFullJoinNotOnUniqueKeys))
+    assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnLHSUniqueKeys))
     assertEquals(8.0E8, mq.getRowCount(logicalFullJoinOnRHSUniqueKeys))
     assertEquals(8.1E8, mq.getRowCount(logicalFullJoinWithEquiAndNonEquiCond))
     assertEquals(8.0E15, mq.getRowCount(logicalFullJoinWithoutEquiCond))
     assertEquals(8.2E8, mq.getRowCount(logicalFullJoinOnDisjointKeys))
+
+    assertEquals(50.0, mq.getRowCount(logicalSemiJoinOnUniqueKeys))
+    assertEquals(8.0E8, mq.getRowCount(logicalSemiJoinNotOnUniqueKeys))
+    assertEquals(2556.0, mq.getRowCount(logicalSemiJoinOnLHSUniqueKeys))
+    assertEquals(2.0E7, mq.getRowCount(logicalSemiJoinOnRHSUniqueKeys))
+    assertEquals(1278.0, mq.getRowCount(logicalSemiJoinWithEquiAndNonEquiCond))
+    assertEquals(4.0E8, mq.getRowCount(logicalSemiJoinWithoutEquiCond))
+    assertEquals(8.0E8, mq.getRowCount(logicalSemiJoinOnDisjointKeys))
+
+    assertEquals(7.9999995E8, mq.getRowCount(logicalAntiJoinOnUniqueKeys))
+    assertEquals(8.0E7, mq.getRowCount(logicalAntiJoinNotOnUniqueKeys))
+    assertEquals(7.99997444E8, mq.getRowCount(logicalAntiJoinOnLHSUniqueKeys))
+    assertEquals(2000000.0, mq.getRowCount(logicalAntiJoinOnRHSUniqueKeys))
+    assertEquals(6.0E8, mq.getRowCount(logicalAntiJoinWithEquiAndNonEquiCond))
+    assertEquals(6.0E8, mq.getRowCount(logicalAntiJoinWithoutEquiCond))
+    assertEquals(8.0E7, mq.getRowCount(logicalAntiJoinOnDisjointKeys))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
index 9dfa3b5..c9b5f2f 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSelectivityTest.scala
@@ -496,6 +496,18 @@ class FlinkRelMdSelectivityTest extends FlinkRelMdHandlerTestBase {
     assertEquals(1D, mq.getSelectivity(join, pred3))
     val pred4 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(3), relBuilder.literal(0))
     assertEquals(0D, mq.getSelectivity(join, pred4))
+
+    assertEquals(3.125E-8, mq.getSelectivity(logicalSemiJoinOnUniqueKeys, pred1))
+    val pred5 = relBuilder.push(logicalSemiJoinNotOnUniqueKeys)
+        .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100000000L))
+    assertEquals(0.5, mq.getSelectivity(logicalSemiJoinNotOnUniqueKeys, pred5))
+
+    val pred6 = relBuilder.push(logicalAntiJoinWithoutEquiCond)
+        .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100L))
+    assertEquals(0.375, mq.getSelectivity(logicalAntiJoinWithoutEquiCond, pred6))
+    val pred7 = relBuilder.push(logicalAntiJoinNotOnUniqueKeys)
+        .call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(100000000L))
+    assertEquals(0.05, mq.getSelectivity(logicalAntiJoinNotOnUniqueKeys, pred7))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
index c94c907..134ab19 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdSizeTest.scala
@@ -171,6 +171,11 @@ class FlinkRelMdSizeTest extends FlinkRelMdHandlerTestBase {
       assertEquals(Seq(4.0, 8.0, 12.0, 88.8, 4.0, 4.0, 8.0, 12.0, 10.52, 4.0),
         mq.getAverageColumnSizes(join).toList)
     }
+
+    Array(logicalSemiJoinOnUniqueKeys, logicalAntiJoinNotOnUniqueKeys).foreach { join =>
+      assertEquals(Seq(4.0, 8.0, 12.0, 88.8, 4.0),
+        mq.getAverageColumnSizes(join).toList)
+    }
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
index 8417023..9bbea62 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueGroupsTest.scala
@@ -578,7 +578,50 @@ class FlinkRelMdUniqueGroupsTest extends FlinkRelMdHandlerTestBase {
 
     // without equi join condition
     assertEquals(ImmutableBitSet.of(1, 5, 6, 7, 8, 9),
-      mq.getUniqueGroups(logicalFullWithoutCond, ImmutableBitSet.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)))
+      mq.getUniqueGroups(logicalFullJoinWithoutCond,
+        ImmutableBitSet.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)))
+
+    // semi join
+    // both left join keys and right join keys are unique
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalSemiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // left join keys are not unique and right join keys are unique
+    assertEquals(ImmutableBitSet.of(0, 1, 2, 3, 4),
+      mq.getUniqueGroups(logicalSemiJoinOnRHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // left join keys are unique and right join keys are not unique
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalSemiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // neither left join keys nor right join keys are unique (non join columns have unique columns)
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalSemiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // with non-equi join condition
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalSemiJoinWithEquiAndNonEquiCond, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // anti join
+    // both left join keys and right join keys are unique
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalAntiJoinOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // left join keys are not unique and right join keys are unique
+    assertEquals(ImmutableBitSet.of(0, 1, 2, 3, 4),
+      mq.getUniqueGroups(logicalAntiJoinOnRHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // left join keys are unique and right join keys are not unique
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalAntiJoinOnLHSUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // neither left join keys nor right join keys are unique (non join columns have unique columns)
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalAntiJoinNotOnUniqueKeys, ImmutableBitSet.of(0, 1, 2, 3, 4)))
+
+    // with non-equi join condition
+    assertEquals(ImmutableBitSet.of(1),
+      mq.getUniqueGroups(logicalAntiJoinWithEquiAndNonEquiCond, ImmutableBitSet.of(0, 1, 2, 3, 4)))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 9e034a6..627599c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -226,6 +226,29 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
     assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinOnRHSUniqueKeys).toSet)
     assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithoutEquiCond).toSet)
     assertEquals(uniqueKeys(), mq.getUniqueKeys(logicalFullJoinWithEquiAndNonEquiCond).toSet)
+
+    assertEquals(uniqueKeys(Array(1)),
+      mq.getUniqueKeys(logicalSemiJoinOnUniqueKeys).toSet)
+    assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalSemiJoinNotOnUniqueKeys).toSet)
+    assertNull(mq.getUniqueKeys(logicalSemiJoinOnRHSUniqueKeys))
+    assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalSemiJoinWithoutEquiCond).toSet)
+    assertEquals(uniqueKeys(Array(1)),
+      mq.getUniqueKeys(logicalSemiJoinWithEquiAndNonEquiCond).toSet)
+
+    assertEquals(uniqueKeys(Array(1)),
+      mq.getUniqueKeys(logicalAntiJoinOnUniqueKeys).toSet)
+    assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalAntiJoinNotOnUniqueKeys).toSet)
+    assertNull(mq.getUniqueKeys(logicalAntiJoinOnRHSUniqueKeys))
+    assertEquals(uniqueKeys(Array(1)), mq.getUniqueKeys(logicalAntiJoinWithoutEquiCond).toSet)
+    assertEquals(uniqueKeys(Array(1)),
+      mq.getUniqueKeys(logicalAntiJoinWithEquiAndNonEquiCond).toSet)
+  }
+
+  @Test
+  def testGetUniqueKeysOnLookupJoin(): Unit = {
+    Array(batchLookupJoin, streamLookupJoin).foreach { join =>
+      assertEquals(uniqueKeys(), mq.getUniqueKeys(join).toSet)
+    }
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
index c63f299..bf7b3a9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/metadata/MetadataTestUtil.scala
@@ -179,6 +179,7 @@ object MetadataTestUtil {
       new IntType(),
       new TimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
+    val fieldNulls = fieldNames.map(_ => true)
 
     val colStatsMap = Map[String, ColumnStats](
       "a" -> new ColumnStats(30L, 0L, 4D, 4, 45, 5),
@@ -187,7 +188,7 @@ object MetadataTestUtil {
     )
 
     val tableStats = new TableStats(50L, colStatsMap)
-    getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats),
+    getDataStreamTable(fieldNames, fieldTypes, fieldNulls, new FlinkStatistic(tableStats),
       producesUpdates = false, isAccRetract = false)
   }
 
@@ -199,6 +200,7 @@ object MetadataTestUtil {
       new IntType(),
       new TimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
+    val fieldNulls = fieldNames.map(_ => true)
 
     val colStatsMap = Map[String, ColumnStats](
       "a" -> new ColumnStats(50L, 0L, 8D, 8, 55, 5),
@@ -208,8 +210,8 @@ object MetadataTestUtil {
 
     val tableStats = new TableStats(50L, colStatsMap)
     val uniqueKeys = Set(Set("a").asJava).asJava
-    getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats, uniqueKeys),
-      producesUpdates = false, isAccRetract = false)
+    getDataStreamTable(fieldNames, fieldTypes, fieldNulls,
+      new FlinkStatistic(tableStats, uniqueKeys), producesUpdates = false, isAccRetract = false)
   }
 
   private def createTemporalTable3(): DataStreamTable[BaseRow] = {
@@ -220,6 +222,7 @@ object MetadataTestUtil {
       new VarCharType(VarCharType.MAX_LENGTH),
       new TimestampType(true, TimestampKind.PROCTIME, 3),
       new TimestampType(true, TimestampKind.ROWTIME, 3))
+    val fieldNulls = fieldNames.map(_ => true)
 
     val colStatsMap = Map[String, ColumnStats](
       "a" -> new ColumnStats(3740000000L, 0L, 4D, 4, null, null),
@@ -228,7 +231,7 @@ object MetadataTestUtil {
     )
 
     val tableStats = new TableStats(4000000000L, colStatsMap)
-    getDataStreamTable(fieldNames, fieldTypes, new FlinkStatistic(tableStats),
+    getDataStreamTable(fieldNames, fieldTypes, fieldNulls, new FlinkStatistic(tableStats),
       producesUpdates = false, isAccRetract = false)
   }
 
@@ -237,13 +240,16 @@ object MetadataTestUtil {
       statistic: FlinkStatistic,
       producesUpdates: Boolean = false,
       isAccRetract: Boolean = false): DataStreamTable[BaseRow] = {
+    val names = tableSchema.getFieldNames
     val types = tableSchema.getFieldTypes.map(fromTypeInfoToLogicalType)
-    getDataStreamTable(tableSchema.getFieldNames, types, statistic, producesUpdates, isAccRetract)
+    val nulls = Array.fill(tableSchema.getFieldCount)(true)
+    getDataStreamTable(names, types, nulls, statistic, producesUpdates, isAccRetract)
   }
 
   private def getDataStreamTable(
       fieldNames: Array[String],
       fieldTypes: Array[LogicalType],
+      fieldNullables: Array[Boolean],
       statistic: FlinkStatistic,
       producesUpdates: Boolean,
       isAccRetract: Boolean): DataStreamTable[BaseRow] = {
@@ -256,7 +262,8 @@ object MetadataTestUtil {
       isAccRetract,
       fieldTypes.indices.toArray,
       fieldNames,
-      statistic)
+      statistic,
+      Some(fieldNullables))
   }
 
 }