You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/19 15:21:42 UTC

[GitHub] [flink] wuchong commented on a change in pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

wuchong commented on a change in pull request #13618:
URL: https://github.com/apache/flink/pull/13618#discussion_r507819555



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -306,4 +301,101 @@ class TableSinkTest extends TableTestBase {
     util.verifyPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testExceptionForWritingVirtualMetadataColumn(): Unit = {
+    // test reordering, skipping, casting of (virtual) metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,
+         |  `m_2` INT METADATA FROM 'metadata_2',
+         |  `b` BIGINT,
+         |  `c` INT,
+         |  `metadata_1` STRING METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',
+         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT *
+        |FROM MetadataTable
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Query schema: [a: INT, m_3: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]\n" +
+      "Sink schema:  [a: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]")
+
+    util.verifyPlan(stmtSet)
+  }
+
+  @Test
+  def testExceptionForWritingInvalidMetadataColumn(): Unit = {
+    // test casting of metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `metadata_1` TIMESTAMP(3) METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'writable-metadata' = 'metadata_1:BOOLEAN'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT TIMESTAMP '1990-10-14 06:00:00.000'
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Invalid data type for metadata key 'metadata_1' in column 'metadata_1' of table " +

Review comment:
       It looks verbose as we duplicate the 'metadata_1' twice. Can we simplify it to `Invalid data type for metadata column 'metadata_1' of table` when it doesn't define metadata alias?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -262,6 +266,25 @@ public DataType toPhysicalRowDataType() {
 		return ROW(fields);
 	}
 
+	/**
+	 * Converts all persisted columns of this schema into a (possibly nested) row data type.
+	 *
+	 * <p>This method returns the query-to-sink schema.
+	 *
+	 * <p>Note: Computed columns and virtual columns are excluded in the returned row data type.
+	 *
+	 * @see DataTypes#ROW(Field...)
+	 * @see #toRowDataType()
+	 * @see #toPhysicalRowDataType()
+	 */
+	public DataType toPersistedRowDataType() {

Review comment:
       It seems that this method is only used in planner, not used for connectors. Should we move this method so some internal utils? At first glance, `toPhysicalRowDataType` vs `toPersistedRowDataType` confuses me a lot. 
   
   Or we can add `The returned row data type contains additional persisted metadata columns than {@link toPhysicalRowDataType()}` to the Note.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
##########
@@ -76,6 +76,27 @@ class TableScanTest extends TableTestBase {
     util.verifyPlan("SELECT * FROM t1")

Review comment:
       Could you add plan tests for projection pushdown and metadata pushdown?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
##########
@@ -270,6 +285,11 @@ public boolean isPhysical() {
 			return false;
 		}
 
+		@Override
+		public boolean isPersisted() {
+			return !isVirtual;
+		}
+
 		@Override
 		public Optional<String> explainExtras() {
 			final StringBuilder sb = new StringBuilder();

Review comment:
       Add a single quote around the `metadataAlias`? 

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -306,4 +301,101 @@ class TableSinkTest extends TableTestBase {
     util.verifyPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testExceptionForWritingVirtualMetadataColumn(): Unit = {
+    // test reordering, skipping, casting of (virtual) metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,
+         |  `m_2` INT METADATA FROM 'metadata_2',
+         |  `b` BIGINT,
+         |  `c` INT,
+         |  `metadata_1` STRING METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',
+         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT *
+        |FROM MetadataTable
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Query schema: [a: INT, m_3: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]\n" +
+      "Sink schema:  [a: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]")
+
+    util.verifyPlan(stmtSet)
+  }
+
+  @Test
+  def testExceptionForWritingInvalidMetadataColumn(): Unit = {
+    // test casting of metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `metadata_1` TIMESTAMP(3) METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'writable-metadata' = 'metadata_1:BOOLEAN'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT TIMESTAMP '1990-10-14 06:00:00.000'
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Invalid data type for metadata key 'metadata_1' in column 'metadata_1' of table " +
+      "'default_catalog.default_database.MetadataTable'. The declared type 'TIMESTAMP(3)' cannot " +
+      "be cast to consumed metadata type 'BOOLEAN'.")

Review comment:
       It looks confusing what is the "consumed" mean for users. Can we say something like `The 'metadata_1' metadata doesn't support to be declared as 'TIMESTAMP(3)', but you can declare it as types castable with 'BOOLEAN'.` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/DynamicSinkUtils.java
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.sinks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableColumn.MetadataColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
+import org.apache.flink.table.planner.plan.schema.CatalogSourceTable;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeTransformations;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsExplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+
+/**
+ * Utilities for dealing with {@link DynamicTableSink}.
+ */
+@Internal
+public final class DynamicSinkUtils {
+
+	/**
+	 * Similar to {@link CatalogSourceTable#toRel(RelOptTable.ToRelContext)}, converts a given
+	 * {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if necessary.
+	 */
+	public static RelNode toRel(
+			FlinkRelBuilder relBuilder,
+			RelNode input,
+			CatalogSinkModifyOperation sinkOperation,
+			DynamicTableSink sink,
+			CatalogTable table) {
+		final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(relBuilder);
+		final TableSchema schema = table.getSchema();
+
+		// 1. prepare table sink
+		prepareDynamicSink(sinkOperation, sink, table);
+
+		// 2. validate the query schema to the sink's table schema and apply cast if possible
+		final RelNode query = validateSchemaAndApplyImplicitCast(
+			input,
+			schema,
+			sinkOperation.getTableIdentifier(),
+			typeFactory);
+		relBuilder.push(query);
+
+		// 3. convert the sink's table schema to the consumed data type of the sink
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+		if (!metadataColumns.isEmpty()) {
+			pushMetadataProjection(relBuilder, typeFactory, schema, sink);
+		}
+
+		final RelNode finalQuery = relBuilder.build();
+
+		return LogicalSink.create(
+			finalQuery,
+			sinkOperation.getTableIdentifier(),
+			table,
+			sink,
+			sinkOperation.getStaticPartitions());
+	}
+
+	/**
+	 * Checks if the given query can be written into the given sink's table schema.
+	 *
+	 * <p>It checks whether field types are compatible (types should be equal including precisions).
+	 * If types are not compatible, but can be implicitly cast, a cast projection will be applied.
+	 * Otherwise, an exception will be thrown.
+	 */
+	public static RelNode validateSchemaAndApplyImplicitCast(
+			RelNode query,
+			TableSchema sinkSchema,
+			@Nullable ObjectIdentifier sinkIdentifier,
+			FlinkTypeFactory typeFactory) {
+		final RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType());
+		final List<RowField> queryFields = queryType.getFields();
+
+		final RowType sinkType = (RowType) fixSinkDataType(sinkSchema.toPersistedRowDataType()).getLogicalType();
+		final List<RowField> sinkFields = sinkType.getFields();
+
+		if (queryFields.size() != sinkFields.size()) {
+			throw createSchemaMismatchException(
+				"Different number of columns.",
+				sinkIdentifier,
+				queryFields,
+				sinkFields);
+		}
+
+		boolean requiresCasting = false;
+		for (int i = 0; i < sinkFields.size(); i++) {
+			final LogicalType queryColumnType = queryFields.get(i).getType();
+			final LogicalType sinkColumnType = sinkFields.get(i).getType();
+			if (!supportsImplicitCast(queryColumnType, sinkColumnType)) {
+				throw createSchemaMismatchException(
+					String.format(
+						"Incompatible types for sink column '%s' at position %s.",
+						sinkFields.get(i).getName(),
+						i),
+					sinkIdentifier,
+					queryFields,
+					sinkFields);
+			}
+			if (!supportsAvoidingCast(queryColumnType, sinkColumnType)) {
+				requiresCasting = true;
+			}
+		}
+
+		if (requiresCasting) {
+			final RelDataType castRelDataType = typeFactory.buildRelNodeRowType(sinkType);
+			return RelOptUtil.createCastRel(query, castRelDataType, true);
+		}
+		return query;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a projection that reorders physical and metadata columns according to the consumed data
+	 * type of the sink. It casts metadata columns into the expected data type.
+	 *
+	 * @see SupportsWritingMetadata
+	 */
+	private static void pushMetadataProjection(
+			FlinkRelBuilder relBuilder,
+			FlinkTypeFactory typeFactory,
+			TableSchema schema,
+			DynamicTableSink sink) {
+		final RexBuilder rexBuilder = relBuilder.getRexBuilder();
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+
+		final List<Integer> physicalColumns = extractPhysicalColumns(schema);
+
+		final Map<String, Integer> keyToMetadataColumn = extractPersistedMetadataColumns(schema)
+			.stream()
+			.collect(
+				Collectors.toMap(
+					pos -> {
+						final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+						return metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+					},
+					Function.identity()
+				)
+			);
+
+		final List<Integer> metadataColumns = createRequiredMetadataKeys(schema, sink)
+			.stream()
+			.map(keyToMetadataColumn::get)
+			.collect(Collectors.toList());
+
+		final List<String> fieldNames = Stream
+			.concat(
+				physicalColumns.stream()
+					.map(tableColumns::get)
+					.map(TableColumn::getName),
+				metadataColumns.stream()
+					.map(tableColumns::get)
+					.map(MetadataColumn.class::cast)
+					.map(c -> c.getMetadataAlias().orElse(c.getName()))
+			)
+			.collect(Collectors.toList());
+
+		final Map<String, DataType> metadataMap = extractMetadataMap(sink);
+
+		final List<RexNode> fieldNodes = Stream
+			.concat(
+				physicalColumns
+					.stream()
+					.map(pos -> {
+						final int posAdjusted = adjustByVirtualColumns(tableColumns, pos);
+						return relBuilder.field(posAdjusted);
+					}),
+				metadataColumns
+					.stream()
+					.map(pos -> {
+						final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+						final String metadataKey = metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+
+						final LogicalType expectedType = metadataMap.get(metadataKey).getLogicalType();
+						final RelDataType expectedRelDataType = typeFactory.createFieldTypeFromLogicalType(expectedType);
+
+						final int posAdjusted = adjustByVirtualColumns(tableColumns, pos);
+						return rexBuilder.makeAbstractCast(expectedRelDataType, relBuilder.field(posAdjusted));
+					})
+			)
+			.collect(Collectors.toList());
+
+		relBuilder.projectNamed(fieldNodes, fieldNames, true);
+	}
+
+	/**
+	 * Prepares the given {@link DynamicTableSink}. It check whether the sink is compatible with the
+	 * INSERT INTO clause and applies initial parameters.
+	 */
+	private static void prepareDynamicSink(
+			CatalogSinkModifyOperation sinkOperation,
+			DynamicTableSink sink,
+			CatalogTable table) {
+		final ObjectIdentifier sinkIdentifier = sinkOperation.getTableIdentifier();
+
+		validatePartitioning(sinkOperation, sinkIdentifier, sink, table.getPartitionKeys());
+
+		validateAndApplyOverwrite(sinkOperation, sinkIdentifier, sink);
+
+		validateAndApplyMetadata(sinkIdentifier, sink, table.getSchema());
+	}
+
+	/**
+	 * Returns a list of required metadata keys. Ordered by the iteration order of
+	 * {@link SupportsWritingMetadata#listWritableMetadata()}.
+	 *
+	 * <p>This method assumes that sink and schema have been validated via
+	 * {@link #prepareDynamicSink(CatalogSinkModifyOperation, DynamicTableSink, CatalogTable)}.
+	 */
+	private static List<String> createRequiredMetadataKeys(TableSchema schema, DynamicTableSink sink) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+
+		final Set<String> requiredMetadataKeys = metadataColumns
+			.stream()
+			.map(tableColumns::get)
+			.map(MetadataColumn.class::cast)
+			.map(c -> c.getMetadataAlias().orElse(c.getName()))
+			.collect(Collectors.toSet());
+
+		final Map<String, DataType> metadataMap = extractMetadataMap(sink);
+
+		return metadataMap.keySet()
+			.stream()
+			.filter(requiredMetadataKeys::contains)
+			.collect(Collectors.toList());
+	}
+
+	private static ValidationException createSchemaMismatchException(
+			String cause,
+			@Nullable ObjectIdentifier sinkIdentifier,
+			List<RowField> queryFields,
+			List<RowField> sinkFields) {
+		final String querySchema = queryFields.stream()
+			.map(f -> f.getName() + ": " + f.getType().asSummaryString())
+			.collect(Collectors.joining(", ", "[", "]"));
+		final String sinkSchema = sinkFields.stream()
+			.map(sinkField -> sinkField.getName() + ": " + sinkField.getType().asSummaryString())
+			.collect(Collectors.joining(", ", "[", "]"));
+		final String tableName;
+		if (sinkIdentifier != null) {
+			tableName = "registered table '" + sinkIdentifier.asSummaryString() + "'";
+		} else {
+			tableName = "unregistered table";
+		}
+
+		return new ValidationException(
+			String.format(
+				"Column types of query result and sink for %s do not match.\n" +
+					"Cause: %s\n\n" +
+					"Query schema: %s\n" +
+					"Sink schema:  %s",
+				tableName,
+				cause,
+				querySchema,
+				sinkSchema
+			)
+		);
+	}
+
+	private static DataType fixSinkDataType(DataType sinkDataType) {
+		// we recognize legacy decimal is the same to default decimal
+		// we ignore NULL constraint, the NULL constraint will be checked during runtime
+		// see StreamExecSink and BatchExecSink
+		return DataTypeUtils.transform(
+			sinkDataType,
+			TypeTransformations.legacyDecimalToDefaultDecimal(),
+			TypeTransformations.legacyRawToTypeInfoRaw(),
+			TypeTransformations.toNullable());
+	}
+
+	private static void validatePartitioning(
+			CatalogSinkModifyOperation sinkOperation,
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink,
+			List<String> partitionKeys) {
+		if (!partitionKeys.isEmpty()) {
+			if (!(sink instanceof SupportsPartitioning)) {
+				throw new TableException(
+					String.format(
+						"Table '%s' is a partitioned table, but the underlying %s doesn't " +
+							"implement the %s interface.",
+						sinkIdentifier.asSummaryString(),
+						DynamicTableSink.class.getSimpleName(),
+						SupportsPartitioning.class.getSimpleName()
+					)
+				);
+			}
+		}
+
+		final Map<String, String> staticPartitions = sinkOperation.getStaticPartitions();
+		staticPartitions.keySet().forEach(p -> {
+			if (!partitionKeys.contains(p)) {
+				throw new ValidationException(
+					String.format(
+						"Static partition column '%s' should be in the partition keys list %s for table '%s'.",
+						p,
+						partitionKeys,
+						sinkIdentifier.asSummaryString()
+					)
+				);
+			}
+		});
+	}
+
+	private static void validateAndApplyOverwrite(
+			CatalogSinkModifyOperation sinkOperation,
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink) {
+		if (!sinkOperation.isOverwrite()) {
+			return;
+		}
+		if (!(sink instanceof SupportsOverwrite)) {
+			throw new ValidationException(
+				String.format(
+					"INSERT OVERWRITE requires that the underlying %s of table '%s' " +
+						"implements the %s interface.",
+					DynamicTableSink.class.getSimpleName(),
+					sinkIdentifier.asSummaryString(),
+					SupportsOverwrite.class.getSimpleName()
+				)
+			);
+		}
+		final SupportsOverwrite overwriteSink = (SupportsOverwrite) sink;
+		overwriteSink.applyOverwrite(sinkOperation.isOverwrite());
+	}
+
+	private static List<Integer> extractPhysicalColumns(TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		return IntStream.range(0, schema.getFieldCount())
+			.filter(pos -> tableColumns.get(pos).isPhysical())
+			.boxed()
+			.collect(Collectors.toList());
+	}
+
+	private static List<Integer> extractPersistedMetadataColumns(TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		return IntStream.range(0, schema.getFieldCount())
+			.filter(pos -> {
+				final TableColumn tableColumn = tableColumns.get(pos);
+				return tableColumn instanceof MetadataColumn && tableColumn.isPersisted();
+			})
+			.boxed()
+			.collect(Collectors.toList());
+	}
+
+	private static int adjustByVirtualColumns(List<TableColumn> tableColumns, int pos) {
+		return pos - (int) IntStream.range(0, pos).filter(i -> !tableColumns.get(i).isPersisted()).count();
+	}
+
+	private static Map<String, DataType> extractMetadataMap(DynamicTableSink sink) {
+		if (sink instanceof SupportsWritingMetadata) {
+			return ((SupportsWritingMetadata) sink).listWritableMetadata();
+		}
+		return Collections.emptyMap();
+	}
+
+	private static void validateAndApplyMetadata(
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink,
+			TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+
+		if (metadataColumns.isEmpty()) {
+			return;
+		}
+
+		if (!(sink instanceof SupportsWritingMetadata)) {
+			throw new ValidationException(
+				String.format(
+					"Table '%s' declares persistable metadata columns, but the underlying %s " +
+						"doesn't implement the %s interface. If the column should not " +
+						"be persisted, it can be declared with the VIRTUAL keyword.",
+					sinkIdentifier.asSummaryString(),
+					DynamicTableSink.class.getSimpleName(),
+					SupportsWritingMetadata.class.getSimpleName()
+				)
+			);
+		}
+
+		final SupportsWritingMetadata metadataSink = (SupportsWritingMetadata) sink;
+
+		final Map<String, DataType> metadataMap = ((SupportsWritingMetadata) sink).listWritableMetadata();
+		metadataColumns.forEach(pos -> {
+			final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+			final String metadataKey = metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+			final LogicalType metadataType = metadataColumn.getType().getLogicalType();
+			final DataType expectedMetadataDataType = metadataMap.get(metadataKey);
+			// check that metadata key is valid
+			if (expectedMetadataDataType == null) {
+				throw new ValidationException(
+					String.format(
+						"Invalid metadata key '%s' in column '%s' of table '%s'. " +
+							"The %s class '%s' supports the following metadata keys for writing:\n%s",
+						metadataKey,
+						metadataColumn.getName(),
+						sinkIdentifier.asSummaryString(),
+						DynamicTableSink.class.getSimpleName(),
+						sink.getClass().getName(),
+						String.join("\n", metadataMap.keySet())
+					)
+				);
+			}
+			// check that types are compatible
+			if (!supportsExplicitCast(metadataType, expectedMetadataDataType.getLogicalType())) {
+				throw new ValidationException(
+					String.format(
+						"Invalid data type for metadata key '%s' in column '%s' of table '%s'. " +
+							"The declared type '%s' cannot be cast to consumed metadata type '%s'.",
+						metadataKey,
+						metadataColumn.getName(),
+						sinkIdentifier.asSummaryString(),
+						metadataType,
+						expectedMetadataDataType.getLogicalType()
+					)
+				);
+			}
+		});
+
+		metadataSink.applyWritableMetadata(
+			createRequiredMetadataKeys(schema, sink),
+			TypeConversions.fromLogicalToDataType(createConsumedType(schema, sink)));
+	}
+
+	/**
+	 * Returns the {@link DataType} that a sink should consume as the output from the runtime.
+	 *
+	 * <p>The format looks as follows: {@code PHYSICAL COLUMNS + METADATA COLUMNS}

Review comment:
       ```suggestion
   	 * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org