You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/12/07 04:23:55 UTC

[flink] branch master updated: [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ac957ae  [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
ac957ae is described below

commit ac957ae07a49ab88ad46a14c6ea0d4891d5437d7
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Nov 30 11:59:04 2021 +0100

    [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
    
    This closes #17962
    
    Co-authored-by: guanghxu <xu...@gmail.com>
---
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  37 +-----
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       |   1 -
 .../metadata/FlinkRelMdColumnUniquenessTest.scala  |  47 +++++++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  | 105 +++++++++++++++-
 .../plan/metadata/FlinkRelMdUniqueKeysTest.scala   |  16 +++
 .../planner/plan/metadata/MetadataTestUtil.scala   | 135 +++++++++++++++++----
 6 files changed, 282 insertions(+), 59 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 6d7382c..241f529 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -23,16 +23,12 @@ import org.apache.flink.table.planner.JBoolean
 import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
-import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.batch._
 import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
 import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil}
 import org.apache.flink.table.runtime.operators.rank.RankType
-import org.apache.flink.table.sources.TableSource
 
-import org.apache.calcite.plan.RelOptTable
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.Converter
@@ -61,42 +57,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = {
-    areTableColumnsUnique(rel, null, rel.getTable, columns)
-  }
-
-  def areColumnsUnique(
-      rel: FlinkLogicalLegacyTableSourceScan,
-      mq: RelMetadataQuery,
-      columns: ImmutableBitSet,
-      ignoreNulls: Boolean): JBoolean = {
-    areTableColumnsUnique(rel, rel.tableSource, rel.getTable, columns)
+    areTableColumnsUnique(rel, mq.getUniqueKeys(rel, ignoreNulls), columns)
   }
 
   private def areTableColumnsUnique(
       rel: TableScan,
-      tableSource: TableSource[_],
-      relOptTable: RelOptTable,
+      uniqueKeys: util.Set[ImmutableBitSet],
       columns: ImmutableBitSet): JBoolean = {
     if (columns.cardinality == 0) {
       return false
     }
 
-    // TODO get uniqueKeys from TableSchema of TableSource
-
-    relOptTable match {
-      case table: FlinkPreparingTableBase => {
-        val ukOptional = table.uniqueKeysSet
-        if (ukOptional.isPresent) {
-          if (ukOptional.get().isEmpty) {
-            false
-          } else {
-            ukOptional.get().exists(columns.contains)
-          }
-        } else {
-          null
-        }
-      }
-      case _ => rel.getTable.isKey(columns)
+    if (uniqueKeys != null) {
+      uniqueKeys.exists(columns.contains) || rel.getTable.isKey(columns)
+    } else {
+      null
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 4a76839..63547eb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -45,7 +45,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
 
 import java.util
-import java.util.Set
 
 import scala.collection.JavaConversions._
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
index 31c8cf0..4e545d3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
@@ -622,4 +622,51 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
     }
   }
 
+  @Test
+  def testAreColumnsUniqueOnTableSourceTable(): Unit = {
+    Array(
+      tableSourceTableLogicalScan,
+      tableSourceTableFlinkLogicalScan,
+      tableSourceTableBatchScan,
+      tableSourceTableStreamScan
+    )
+      .foreach { scan =>
+        assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(1, 2)))
+        assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+      }
+  }
+
+  @Test
+  def testAreColumnsUniqueOnTablePartiallyProjectedKey(): Unit = {
+    Array(
+      tablePartiallyProjectedKeyLogicalScan,
+      tablePartiallyProjectedKeyFlinkLogicalScan,
+      tablePartiallyProjectedKeyBatchScan,
+      tablePartiallyProjectedKeyStreamScan
+    )
+      .foreach { scan =>
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+      }
+  }
+
+  @Test
+  def testAreColumnsUniqueOntableSourceTableNonKeyNonKey(): Unit = {
+    Array(
+      tableSourceTableNonKeyLogicalScan,
+      tableSourceTableNonKeyFlinkLogicalScan,
+      tableSourceTableNonKeyBatchScan,
+      tableSourceTableNonKeyStreamScan
+    )
+      .foreach { scan =>
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
+        assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+      }
+  }
 }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 3a3acc8..f935697 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.batch._
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable}
+import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable}
 import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable
 import org.apache.flink.table.planner.plan.utils._
 import org.apache.flink.table.planner.utils.Top3
@@ -171,6 +171,37 @@ class FlinkRelMdHandlerTestBase {
   protected lazy val empStreamScan: StreamPhysicalDataStreamScan =
     createDataStreamScan(ImmutableList.of("emp"), streamPhysicalTraits)
 
+  protected lazy val tableSourceTableLogicalScan: LogicalTableScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable1"), logicalTraits)
+  protected lazy val tableSourceTableFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable1"), flinkLogicalTraits)
+  protected lazy val tableSourceTableBatchScan: BatchPhysicalBoundedStreamScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable1"), batchPhysicalTraits)
+  protected lazy val tableSourceTableStreamScan: StreamPhysicalDataStreamScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable1"), streamPhysicalTraits)
+
+  protected lazy val tablePartiallyProjectedKeyLogicalScan: LogicalTableScan =
+    createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+      logicalTraits)
+  protected lazy val tablePartiallyProjectedKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+    createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+      flinkLogicalTraits)
+  protected lazy val tablePartiallyProjectedKeyBatchScan: BatchPhysicalBoundedStreamScan =
+    createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+      batchPhysicalTraits)
+  protected lazy val tablePartiallyProjectedKeyStreamScan: StreamPhysicalDataStreamScan =
+    createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+      streamPhysicalTraits)
+
+  protected lazy val tableSourceTableNonKeyLogicalScan: LogicalTableScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable3"), logicalTraits)
+  protected lazy val tableSourceTableNonKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable3"), flinkLogicalTraits)
+  protected lazy val tableSourceTableNonKeyBatchScan: BatchPhysicalBoundedStreamScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable3"), batchPhysicalTraits)
+  protected lazy val tableSourceTableNonKeyStreamScan: StreamPhysicalDataStreamScan =
+    createTableSourceTable(ImmutableList.of("TableSourceTable3"), streamPhysicalTraits)
+
   private lazy val valuesType = relBuilder.getTypeFactory
     .builder()
     .add("a", SqlTypeName.BIGINT)
@@ -2727,6 +2758,51 @@ class FlinkRelMdHandlerTestBase {
     }
   }
 
+  // select * from TableSourceTable1
+  // left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b
+  protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder
+    .scan("TableSourceTable1")
+    .scan("TableSourceTable2")
+    .join(
+      JoinRelType.LEFT,
+      relBuilder.call(
+        EQUALS,
+        relBuilder.field(2, 0, 1),
+        relBuilder.field(2, 1, 1)
+      )
+    )
+    .build
+
+  // select * from TableSourceTable1
+  // left join TableSourceTable2 on TableSourceTable1.a = TableSourceTable2.a
+  protected lazy val logicalLeftJoinOnDisjointUniqueKeys: RelNode = relBuilder
+    .scan("TableSourceTable1")
+    .scan("TableSourceTable2")
+    .join(
+      JoinRelType.LEFT,
+      relBuilder.call(
+        EQUALS,
+        relBuilder.field(2, 0, 0),
+        relBuilder.field(2, 1, 0)
+      )
+    )
+    .build
+
+  // select * from TableSourceTable1
+  // left join TableSourceTable3 on TableSourceTable1.a = TableSourceTable3.a
+  protected lazy val logicalLeftJoinWithNoneKeyTableUniqueKeys: RelNode = relBuilder
+    .scan("TableSourceTable1")
+    .scan("TableSourceTable3")
+    .join(
+      JoinRelType.LEFT,
+      relBuilder.call(
+        EQUALS,
+        relBuilder.field(2, 0, 0),
+        relBuilder.field(2, 1, 0)
+      )
+    )
+    .build
+
   protected def createDataStreamScan[T](
       tableNames: util.List[String], traitSet: RelTraitSet): T = {
     val table = relBuilder
@@ -2754,6 +2830,33 @@ class FlinkRelMdHandlerTestBase {
     scan.asInstanceOf[T]
   }
 
+  protected def createTableSourceTable[T](
+      tableNames: util.List[String], traitSet: RelTraitSet): T = {
+    val table = relBuilder
+      .getRelOptSchema
+      .asInstanceOf[CalciteCatalogReader]
+      .getTable(tableNames)
+      .asInstanceOf[TableSourceTable]
+    val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE)
+    val scan = conventionTrait match {
+      case Convention.NONE =>
+        relBuilder.clear()
+        val scan = relBuilder.scan(tableNames).build()
+        scan.copy(traitSet, scan.getInputs)
+      case FlinkConventions.LOGICAL =>
+        new FlinkLogicalDataStreamTableScan(
+          cluster, traitSet, Collections.emptyList[RelHint](), table)
+      case FlinkConventions.BATCH_PHYSICAL =>
+        new BatchPhysicalBoundedStreamScan(
+          cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
+      case FlinkConventions.STREAM_PHYSICAL =>
+        new StreamPhysicalDataStreamScan(
+          cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
+      case _ => throw new TableException(s"Unsupported convention trait: $conventionTrait")
+    }
+    scan.asInstanceOf[T]
+  }
+
   protected def createLiteralList(
       rowType: RelDataType,
       literalValues: Seq[String]): util.List[RexLiteral] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 9ebee0d..ec748a4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -314,6 +314,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
     assertNull(mq.getUniqueKeys(testRel))
   }
 
+  @Test
+  def testGetUniqueKeysOnTableScanTable(): Unit = {
+    assertEquals(
+      uniqueKeys(Array(0, 1), Array(0, 1, 5)),
+      mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet
+    )
+    assertEquals(
+      uniqueKeys(Array(0, 1, 5)),
+      mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet
+    )
+    assertEquals(
+      uniqueKeys(),
+      mq.getUniqueKeys(logicalLeftJoinWithNoneKeyTableUniqueKeys).toSet
+    )
+  }
+
   private def uniqueKeys(keys: Array[Int]*): Set[ImmutableBitSet] = {
     keys.map(k => ImmutableBitSet.of(k: _*)).toSet
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 72dab24..f12993f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -58,6 +58,9 @@ object MetadataTestUtil {
     rootSchema.add("TemporalTable1", createTemporalTable1())
     rootSchema.add("TemporalTable2", createTemporalTable2())
     rootSchema.add("TemporalTable3", createTemporalTable3())
+    rootSchema.add("TableSourceTable1", createTableSourceTable1())
+    rootSchema.add("TableSourceTable2", createTableSourceTable2())
+    rootSchema.add("TableSourceTable3", createTableSourceTable3())
     rootSchema.add("projected_table_source_table", createProjectedTableSourceTable())
     rootSchema.add(
       "projected_table_source_table_with_partial_pk",
@@ -258,31 +261,17 @@ object MetadataTestUtil {
     null)
 
   private def createProjectedTableSourceTable(): Table = {
-    val catalogTable = CatalogTable.fromProperties(
-      Map(
-        "connector" -> "values",
-        "bounded" -> "true",
-        "schema.0.name" -> "a",
-        "schema.0.data-type" -> "BIGINT NOT NULL",
-        "schema.1.name" -> "b",
-        "schema.1.data-type" -> "INT",
-        "schema.2.name" -> "c",
-        "schema.2.data-type" -> "VARCHAR(2147483647)",
-        "schema.3.name" -> "d",
-        "schema.3.data-type" -> "BIGINT NOT NULL",
-        "schema.primary-key.name" -> "PK_1",
-        "schema.primary-key.columns" -> "a,d")
-    )
-
     val resolvedSchema = new ResolvedSchema(
       util.Arrays.asList(
         Column.physical("a", DataTypes.BIGINT().notNull()),
         Column.physical("b", DataTypes.INT()),
-        Column.physical("c", DataTypes.STRING()),
+        Column.physical("c", DataTypes.VARCHAR(2147483647)),
         Column.physical("d", DataTypes.BIGINT().notNull())),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "d")))
 
+    val catalogTable = getCatalogTable(resolvedSchema)
+
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
     val rowType = typeFactory.buildRelNodeRowType(
       Seq("a", "c", "d"),
@@ -298,26 +287,108 @@ object MetadataTestUtil {
       )
   }
 
-  private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = {
-    val catalogTable = CatalogTable.fromProperties(
+  private def createTableSourceTable1(): Table = {
+    val catalogTable = CatalogTable.of(
+      org.apache.flink.table.api.Schema.newBuilder
+        .column("a", DataTypes.BIGINT.notNull)
+        .column("b", DataTypes.INT.notNull)
+        .column("c", DataTypes.VARCHAR(2147483647).notNull)
+        .column("d", DataTypes.BIGINT.notNull)
+        .primaryKeyNamed("PK_1", "a", "b")
+        .build,
+      null,
+      Collections.emptyList(),
       Map(
         "connector" -> "values",
-        "bounded" -> "true",
-        "schema.0.name" -> "a",
-        "schema.0.data-type" -> "BIGINT NOT NULL",
-        "schema.1.name" -> "b",
-        "schema.1.data-type" -> "BIGINT NOT NULL",
-        "schema.primary-key.name" -> "PK_1",
-        "schema.primary-key.columns" -> "a,b")
+        "bounded" -> "true"
+      )
     )
 
     val resolvedSchema = new ResolvedSchema(
       util.Arrays.asList(
         Column.physical("a", DataTypes.BIGINT().notNull()),
+        Column.physical("b", DataTypes.INT().notNull()),
+        Column.physical("c", DataTypes.STRING().notNull()),
+        Column.physical("d", DataTypes.BIGINT().notNull())),
+      Collections.emptyList(),
+      UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")))
+
+    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+    val rowType = typeFactory.buildRelNodeRowType(
+      Seq("a", "b", "c", "d"),
+      Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+    new MockTableSourceTable(
+      ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable1"),
+      rowType,
+      new TestTableSource(),
+      true,
+      new ResolvedCatalogTable(catalogTable, resolvedSchema),
+      flinkContext)
+  }
+
+  private def createTableSourceTable2(): Table = {
+    val resolvedSchema = new ResolvedSchema(
+      util.Arrays.asList(
+        Column.physical("a", DataTypes.BIGINT().notNull()),
+        Column.physical("b", DataTypes.INT().notNull()),
+        Column.physical("c", DataTypes.STRING().notNull()),
+        Column.physical("d", DataTypes.BIGINT().notNull())),
+      Collections.emptyList(),
+      UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("b")))
+
+    val catalogTable = getCatalogTable(resolvedSchema)
+
+    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+    val rowType = typeFactory.buildRelNodeRowType(
+      Seq("a", "b", "c", "d"),
+      Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+    new MockTableSourceTable(
+      ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable2"),
+      rowType,
+      new TestTableSource(),
+      true,
+      new ResolvedCatalogTable(catalogTable, resolvedSchema),
+      flinkContext)
+  }
+
+  private def createTableSourceTable3(): Table = {
+    val resolvedSchema = new ResolvedSchema(
+      util.Arrays.asList(
+        Column.physical("a", DataTypes.BIGINT().notNull()),
+        Column.physical("b", DataTypes.INT().notNull()),
+        Column.physical("c", DataTypes.STRING().notNull()),
+        Column.physical("d", DataTypes.BIGINT().notNull())),
+      Collections.emptyList(),
+      null)
+
+    val catalogTable = getCatalogTable(resolvedSchema)
+
+    val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+    val rowType = typeFactory.buildRelNodeRowType(
+      Seq("a", "b", "c", "d"),
+      Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+    new MockTableSourceTable(
+      ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable3"),
+      rowType,
+      new TestTableSource(),
+      true,
+      new ResolvedCatalogTable(catalogTable, resolvedSchema),
+      flinkContext)
+  }
+
+  private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = {
+    val resolvedSchema = new ResolvedSchema(
+      util.Arrays.asList(
+        Column.physical("a", DataTypes.BIGINT().notNull()),
         Column.physical("b", DataTypes.BIGINT().notNull())),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")))
 
+    val catalogTable = getCatalogTable(resolvedSchema)
+
     val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
     val rowType = typeFactory.buildRelNodeRowType(
       Seq("a"),
@@ -335,6 +406,18 @@ object MetadataTestUtil {
       flinkContext)
   }
 
+  private def getCatalogTable(resolvedSchema: ResolvedSchema) = {
+    CatalogTable.of(
+      org.apache.flink.table.api.Schema.newBuilder.fromResolvedSchema(resolvedSchema).build,
+      null,
+      Collections.emptyList(),
+      Map(
+        "connector" -> "values",
+        "bounded" -> "true"
+      )
+    )
+  }
+
   private def getMetadataTable(
       tableSchema: TableSchema,
       statistic: FlinkStatistic,