You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/12/07 04:23:55 UTC
[flink] branch master updated: [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ac957ae [FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
ac957ae is described below
commit ac957ae07a49ab88ad46a14c6ea0d4891d5437d7
Author: Marios Trivyzas <ma...@gmail.com>
AuthorDate: Tue Nov 30 11:59:04 2021 +0100
[FLINK-22113][table-planner] Implement column uniqueness checking for TableSourceTable
This closes #17962
Co-authored-by: guanghxu <xu...@gmail.com>
---
.../plan/metadata/FlinkRelMdColumnUniqueness.scala | 37 +-----
.../plan/metadata/FlinkRelMdUniqueKeys.scala | 1 -
.../metadata/FlinkRelMdColumnUniquenessTest.scala | 47 +++++++
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 105 +++++++++++++++-
.../plan/metadata/FlinkRelMdUniqueKeysTest.scala | 16 +++
.../planner/plan/metadata/MetadataTestUtil.scala | 135 +++++++++++++++++----
6 files changed, 282 insertions(+), 59 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 6d7382c..241f529 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -23,16 +23,12 @@ import org.apache.flink.table.planner.JBoolean
import org.apache.flink.table.planner.expressions.PlannerNamedWindowProperty
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, WindowAggregate}
-import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.nodes.physical.batch._
import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin
import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.FlinkPreparingTableBase
import org.apache.flink.table.planner.plan.utils.{FlinkRelMdUtil, RankUtil}
import org.apache.flink.table.runtime.operators.rank.RankType
-import org.apache.flink.table.sources.TableSource
-import org.apache.calcite.plan.RelOptTable
import org.apache.calcite.plan.volcano.RelSubset
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.Converter
@@ -61,42 +57,21 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata
mq: RelMetadataQuery,
columns: ImmutableBitSet,
ignoreNulls: Boolean): JBoolean = {
- areTableColumnsUnique(rel, null, rel.getTable, columns)
- }
-
- def areColumnsUnique(
- rel: FlinkLogicalLegacyTableSourceScan,
- mq: RelMetadataQuery,
- columns: ImmutableBitSet,
- ignoreNulls: Boolean): JBoolean = {
- areTableColumnsUnique(rel, rel.tableSource, rel.getTable, columns)
+ areTableColumnsUnique(rel, mq.getUniqueKeys(rel, ignoreNulls), columns)
}
private def areTableColumnsUnique(
rel: TableScan,
- tableSource: TableSource[_],
- relOptTable: RelOptTable,
+ uniqueKeys: util.Set[ImmutableBitSet],
columns: ImmutableBitSet): JBoolean = {
if (columns.cardinality == 0) {
return false
}
- // TODO get uniqueKeys from TableSchema of TableSource
-
- relOptTable match {
- case table: FlinkPreparingTableBase => {
- val ukOptional = table.uniqueKeysSet
- if (ukOptional.isPresent) {
- if (ukOptional.get().isEmpty) {
- false
- } else {
- ukOptional.get().exists(columns.contains)
- }
- } else {
- null
- }
- }
- case _ => rel.getTable.isKey(columns)
+ if (uniqueKeys != null) {
+ uniqueKeys.exists(columns.contains) || rel.getTable.isKey(columns)
+ } else {
+ null
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 4a76839..63547eb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -45,7 +45,6 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.util.{Bug, BuiltInMethod, ImmutableBitSet, Util}
import java.util
-import java.util.Set
import scala.collection.JavaConversions._
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
index 31c8cf0..4e545d3 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniquenessTest.scala
@@ -622,4 +622,51 @@ class FlinkRelMdColumnUniquenessTest extends FlinkRelMdHandlerTestBase {
}
}
+ @Test
+ def testAreColumnsUniqueOnTableSourceTable(): Unit = {
+ Array(
+ tableSourceTableLogicalScan,
+ tableSourceTableFlinkLogicalScan,
+ tableSourceTableBatchScan,
+ tableSourceTableStreamScan
+ )
+ .foreach { scan =>
+ assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(1, 2)))
+ assertTrue(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+ }
+ }
+
+ @Test
+ def testAreColumnsUniqueOnTablePartiallyProjectedKey(): Unit = {
+ Array(
+ tablePartiallyProjectedKeyLogicalScan,
+ tablePartiallyProjectedKeyFlinkLogicalScan,
+ tablePartiallyProjectedKeyBatchScan,
+ tablePartiallyProjectedKeyStreamScan
+ )
+ .foreach { scan =>
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+ }
+ }
+
+ @Test
+ def testAreColumnsUniqueOntableSourceTableNonKeyNonKey(): Unit = {
+ Array(
+ tableSourceTableNonKeyLogicalScan,
+ tableSourceTableNonKeyFlinkLogicalScan,
+ tableSourceTableNonKeyBatchScan,
+ tableSourceTableNonKeyStreamScan
+ )
+ .foreach { scan =>
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2)))
+ assertFalse(mq.areColumnsUnique(scan, ImmutableBitSet.of(0, 1, 2, 3)))
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 3a3acc8..f935697 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.nodes.physical.batch._
import org.apache.flink.table.planner.plan.nodes.physical.stream._
-import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable}
+import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, IntermediateRelTable, TableSourceTable}
import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable
import org.apache.flink.table.planner.plan.utils._
import org.apache.flink.table.planner.utils.Top3
@@ -171,6 +171,37 @@ class FlinkRelMdHandlerTestBase {
protected lazy val empStreamScan: StreamPhysicalDataStreamScan =
createDataStreamScan(ImmutableList.of("emp"), streamPhysicalTraits)
+ protected lazy val tableSourceTableLogicalScan: LogicalTableScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable1"), logicalTraits)
+ protected lazy val tableSourceTableFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable1"), flinkLogicalTraits)
+ protected lazy val tableSourceTableBatchScan: BatchPhysicalBoundedStreamScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable1"), batchPhysicalTraits)
+ protected lazy val tableSourceTableStreamScan: StreamPhysicalDataStreamScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable1"), streamPhysicalTraits)
+
+ protected lazy val tablePartiallyProjectedKeyLogicalScan: LogicalTableScan =
+ createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+ logicalTraits)
+ protected lazy val tablePartiallyProjectedKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+ createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+ flinkLogicalTraits)
+ protected lazy val tablePartiallyProjectedKeyBatchScan: BatchPhysicalBoundedStreamScan =
+ createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+ batchPhysicalTraits)
+ protected lazy val tablePartiallyProjectedKeyStreamScan: StreamPhysicalDataStreamScan =
+ createTableSourceTable(ImmutableList.of("projected_table_source_table_with_partial_pk"),
+ streamPhysicalTraits)
+
+ protected lazy val tableSourceTableNonKeyLogicalScan: LogicalTableScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable3"), logicalTraits)
+ protected lazy val tableSourceTableNonKeyFlinkLogicalScan: FlinkLogicalDataStreamTableScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable3"), flinkLogicalTraits)
+ protected lazy val tableSourceTableNonKeyBatchScan: BatchPhysicalBoundedStreamScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable3"), batchPhysicalTraits)
+ protected lazy val tableSourceTableNonKeyStreamScan: StreamPhysicalDataStreamScan =
+ createTableSourceTable(ImmutableList.of("TableSourceTable3"), streamPhysicalTraits)
+
private lazy val valuesType = relBuilder.getTypeFactory
.builder()
.add("a", SqlTypeName.BIGINT)
@@ -2727,6 +2758,51 @@ class FlinkRelMdHandlerTestBase {
}
}
+ // select * from TableSourceTable1
+ // left join TableSourceTable2 on TableSourceTable1.b = TableSourceTable2.b
+ protected lazy val logicalLeftJoinOnContainedUniqueKeys: RelNode = relBuilder
+ .scan("TableSourceTable1")
+ .scan("TableSourceTable2")
+ .join(
+ JoinRelType.LEFT,
+ relBuilder.call(
+ EQUALS,
+ relBuilder.field(2, 0, 1),
+ relBuilder.field(2, 1, 1)
+ )
+ )
+ .build
+
+ // select * from TableSourceTable1
+ // left join TableSourceTable2 on TableSourceTable1.a = TableSourceTable2.a
+ protected lazy val logicalLeftJoinOnDisjointUniqueKeys: RelNode = relBuilder
+ .scan("TableSourceTable1")
+ .scan("TableSourceTable2")
+ .join(
+ JoinRelType.LEFT,
+ relBuilder.call(
+ EQUALS,
+ relBuilder.field(2, 0, 0),
+ relBuilder.field(2, 1, 0)
+ )
+ )
+ .build
+
+ // select * from TableSourceTable1
+ // left join TableSourceTable3 on TableSourceTable1.a = TableSourceTable3.a
+ protected lazy val logicalLeftJoinWithNoneKeyTableUniqueKeys: RelNode = relBuilder
+ .scan("TableSourceTable1")
+ .scan("TableSourceTable3")
+ .join(
+ JoinRelType.LEFT,
+ relBuilder.call(
+ EQUALS,
+ relBuilder.field(2, 0, 0),
+ relBuilder.field(2, 1, 0)
+ )
+ )
+ .build
+
protected def createDataStreamScan[T](
tableNames: util.List[String], traitSet: RelTraitSet): T = {
val table = relBuilder
@@ -2754,6 +2830,33 @@ class FlinkRelMdHandlerTestBase {
scan.asInstanceOf[T]
}
+ protected def createTableSourceTable[T](
+ tableNames: util.List[String], traitSet: RelTraitSet): T = {
+ val table = relBuilder
+ .getRelOptSchema
+ .asInstanceOf[CalciteCatalogReader]
+ .getTable(tableNames)
+ .asInstanceOf[TableSourceTable]
+ val conventionTrait = traitSet.getTrait(ConventionTraitDef.INSTANCE)
+ val scan = conventionTrait match {
+ case Convention.NONE =>
+ relBuilder.clear()
+ val scan = relBuilder.scan(tableNames).build()
+ scan.copy(traitSet, scan.getInputs)
+ case FlinkConventions.LOGICAL =>
+ new FlinkLogicalDataStreamTableScan(
+ cluster, traitSet, Collections.emptyList[RelHint](), table)
+ case FlinkConventions.BATCH_PHYSICAL =>
+ new BatchPhysicalBoundedStreamScan(
+ cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
+ case FlinkConventions.STREAM_PHYSICAL =>
+ new StreamPhysicalDataStreamScan(
+ cluster, traitSet, Collections.emptyList[RelHint](), table, table.getRowType)
+ case _ => throw new TableException(s"Unsupported convention trait: $conventionTrait")
+ }
+ scan.asInstanceOf[T]
+ }
+
protected def createLiteralList(
rowType: RelDataType,
literalValues: Seq[String]): util.List[RexLiteral] = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
index 9ebee0d..ec748a4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeysTest.scala
@@ -314,6 +314,22 @@ class FlinkRelMdUniqueKeysTest extends FlinkRelMdHandlerTestBase {
assertNull(mq.getUniqueKeys(testRel))
}
+ @Test
+ def testGetUniqueKeysOnTableScanTable(): Unit = {
+ assertEquals(
+ uniqueKeys(Array(0, 1), Array(0, 1, 5)),
+ mq.getUniqueKeys(logicalLeftJoinOnContainedUniqueKeys).toSet
+ )
+ assertEquals(
+ uniqueKeys(Array(0, 1, 5)),
+ mq.getUniqueKeys(logicalLeftJoinOnDisjointUniqueKeys).toSet
+ )
+ assertEquals(
+ uniqueKeys(),
+ mq.getUniqueKeys(logicalLeftJoinWithNoneKeyTableUniqueKeys).toSet
+ )
+ }
+
private def uniqueKeys(keys: Array[Int]*): Set[ImmutableBitSet] = {
keys.map(k => ImmutableBitSet.of(k: _*)).toSet
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 72dab24..f12993f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -58,6 +58,9 @@ object MetadataTestUtil {
rootSchema.add("TemporalTable1", createTemporalTable1())
rootSchema.add("TemporalTable2", createTemporalTable2())
rootSchema.add("TemporalTable3", createTemporalTable3())
+ rootSchema.add("TableSourceTable1", createTableSourceTable1())
+ rootSchema.add("TableSourceTable2", createTableSourceTable2())
+ rootSchema.add("TableSourceTable3", createTableSourceTable3())
rootSchema.add("projected_table_source_table", createProjectedTableSourceTable())
rootSchema.add(
"projected_table_source_table_with_partial_pk",
@@ -258,31 +261,17 @@ object MetadataTestUtil {
null)
private def createProjectedTableSourceTable(): Table = {
- val catalogTable = CatalogTable.fromProperties(
- Map(
- "connector" -> "values",
- "bounded" -> "true",
- "schema.0.name" -> "a",
- "schema.0.data-type" -> "BIGINT NOT NULL",
- "schema.1.name" -> "b",
- "schema.1.data-type" -> "INT",
- "schema.2.name" -> "c",
- "schema.2.data-type" -> "VARCHAR(2147483647)",
- "schema.3.name" -> "d",
- "schema.3.data-type" -> "BIGINT NOT NULL",
- "schema.primary-key.name" -> "PK_1",
- "schema.primary-key.columns" -> "a,d")
- )
-
val resolvedSchema = new ResolvedSchema(
util.Arrays.asList(
Column.physical("a", DataTypes.BIGINT().notNull()),
Column.physical("b", DataTypes.INT()),
- Column.physical("c", DataTypes.STRING()),
+ Column.physical("c", DataTypes.VARCHAR(2147483647)),
Column.physical("d", DataTypes.BIGINT().notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "d")))
+ val catalogTable = getCatalogTable(resolvedSchema)
+
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
val rowType = typeFactory.buildRelNodeRowType(
Seq("a", "c", "d"),
@@ -298,26 +287,108 @@ object MetadataTestUtil {
)
}
- private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = {
- val catalogTable = CatalogTable.fromProperties(
+ private def createTableSourceTable1(): Table = {
+ val catalogTable = CatalogTable.of(
+ org.apache.flink.table.api.Schema.newBuilder
+ .column("a", DataTypes.BIGINT.notNull)
+ .column("b", DataTypes.INT.notNull)
+ .column("c", DataTypes.VARCHAR(2147483647).notNull)
+ .column("d", DataTypes.BIGINT.notNull)
+ .primaryKeyNamed("PK_1", "a", "b")
+ .build,
+ null,
+ Collections.emptyList(),
Map(
"connector" -> "values",
- "bounded" -> "true",
- "schema.0.name" -> "a",
- "schema.0.data-type" -> "BIGINT NOT NULL",
- "schema.1.name" -> "b",
- "schema.1.data-type" -> "BIGINT NOT NULL",
- "schema.primary-key.name" -> "PK_1",
- "schema.primary-key.columns" -> "a,b")
+ "bounded" -> "true"
+ )
)
val resolvedSchema = new ResolvedSchema(
util.Arrays.asList(
Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.INT().notNull()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.BIGINT().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")))
+
+ val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+ val rowType = typeFactory.buildRelNodeRowType(
+ Seq("a", "b", "c", "d"),
+ Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+ new MockTableSourceTable(
+ ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable1"),
+ rowType,
+ new TestTableSource(),
+ true,
+ new ResolvedCatalogTable(catalogTable, resolvedSchema),
+ flinkContext)
+ }
+
+ private def createTableSourceTable2(): Table = {
+ val resolvedSchema = new ResolvedSchema(
+ util.Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.INT().notNull()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.BIGINT().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("b")))
+
+ val catalogTable = getCatalogTable(resolvedSchema)
+
+ val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+ val rowType = typeFactory.buildRelNodeRowType(
+ Seq("a", "b", "c", "d"),
+ Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+ new MockTableSourceTable(
+ ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable2"),
+ rowType,
+ new TestTableSource(),
+ true,
+ new ResolvedCatalogTable(catalogTable, resolvedSchema),
+ flinkContext)
+ }
+
+ private def createTableSourceTable3(): Table = {
+ val resolvedSchema = new ResolvedSchema(
+ util.Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
+ Column.physical("b", DataTypes.INT().notNull()),
+ Column.physical("c", DataTypes.STRING().notNull()),
+ Column.physical("d", DataTypes.BIGINT().notNull())),
+ Collections.emptyList(),
+ null)
+
+ val catalogTable = getCatalogTable(resolvedSchema)
+
+ val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
+ val rowType = typeFactory.buildRelNodeRowType(
+ Seq("a", "b", "c", "d"),
+ Seq(new BigIntType(false), new IntType(), new VarCharType(false, 100), new BigIntType(false)))
+
+ new MockTableSourceTable(
+ ObjectIdentifier.of("default_catalog", "default_database", "TableSourceTable3"),
+ rowType,
+ new TestTableSource(),
+ true,
+ new ResolvedCatalogTable(catalogTable, resolvedSchema),
+ flinkContext)
+ }
+
+ private def createProjectedTableSourceTableWithPartialCompositePrimaryKey(): Table = {
+ val resolvedSchema = new ResolvedSchema(
+ util.Arrays.asList(
+ Column.physical("a", DataTypes.BIGINT().notNull()),
Column.physical("b", DataTypes.BIGINT().notNull())),
Collections.emptyList(),
UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")))
+ val catalogTable = getCatalogTable(resolvedSchema)
+
val typeFactory = new FlinkTypeFactory(new FlinkTypeSystem)
val rowType = typeFactory.buildRelNodeRowType(
Seq("a"),
@@ -335,6 +406,18 @@ object MetadataTestUtil {
flinkContext)
}
+ private def getCatalogTable(resolvedSchema: ResolvedSchema) = {
+ CatalogTable.of(
+ org.apache.flink.table.api.Schema.newBuilder.fromResolvedSchema(resolvedSchema).build,
+ null,
+ Collections.emptyList(),
+ Map(
+ "connector" -> "values",
+ "bounded" -> "true"
+ )
+ )
+ }
+
private def getMetadataTable(
tableSchema: TableSchema,
statistic: FlinkStatistic,