You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/13 14:56:27 UTC

[GitHub] [flink] twalthr opened a new pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

twalthr opened a new pull request #13618:
URL: https://github.com/apache/flink/pull/13618


   ## What is the purpose of the change
   
   This enables metadata columns in the planner. Both `SupportsReadingMetadata` and `SupportsWritingMetadata` are fully supported for sources and sinks.
   
   ## Brief change log
   
   - Reimplement `CatalogTable` to `RelNode` logic for sources.
   - Reimplement `CatalogTable` to `RelNode` logic for sinks.
   - Support `SupportsReadingMetadata`.
   - Support `SupportsWritingMetadata`.
   - Support `SupportsReadingMetadata together with `SupportsProjectionPushDown`.
   - Update `TestValuesTableFactory` to support the new interfaces.
   - Several exception and code improvements
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `TableScanTest`
   - `TableSinkTest`
   - `TableSourceITCase`
   - `TableSinkITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106",
       "triggerID" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   * 4922a794b8792bcd5877ac7d08fc8c1bb82108aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099) 
   * 722efcead1a2e9d3fa3c11d8c6685850f7c5def6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #13618:
URL: https://github.com/apache/flink/pull/13618


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13618:
URL: https://github.com/apache/flink/pull/13618#discussion_r507819555



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -306,4 +301,101 @@ class TableSinkTest extends TableTestBase {
     util.verifyPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testExceptionForWritingVirtualMetadataColumn(): Unit = {
+    // test reordering, skipping, casting of (virtual) metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,
+         |  `m_2` INT METADATA FROM 'metadata_2',
+         |  `b` BIGINT,
+         |  `c` INT,
+         |  `metadata_1` STRING METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',
+         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT *
+        |FROM MetadataTable
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Query schema: [a: INT, m_3: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]\n" +
+      "Sink schema:  [a: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]")
+
+    util.verifyPlan(stmtSet)
+  }
+
+  @Test
+  def testExceptionForWritingInvalidMetadataColumn(): Unit = {
+    // test casting of metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `metadata_1` TIMESTAMP(3) METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'writable-metadata' = 'metadata_1:BOOLEAN'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT TIMESTAMP '1990-10-14 06:00:00.000'
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Invalid data type for metadata key 'metadata_1' in column 'metadata_1' of table " +

Review comment:
       It looks verbose as we duplicate the 'metadata_1' twice. Can we simplify it to `Invalid data type for metadata column 'metadata_1' of table` when it doesn't define metadata alias?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -262,6 +266,25 @@ public DataType toPhysicalRowDataType() {
 		return ROW(fields);
 	}
 
+	/**
+	 * Converts all persisted columns of this schema into a (possibly nested) row data type.
+	 *
+	 * <p>This method returns the query-to-sink schema.
+	 *
+	 * <p>Note: Computed columns and virtual columns are excluded in the returned row data type.
+	 *
+	 * @see DataTypes#ROW(Field...)
+	 * @see #toRowDataType()
+	 * @see #toPhysicalRowDataType()
+	 */
+	public DataType toPersistedRowDataType() {

Review comment:
       It seems that this method is only used in planner, not used for connectors. Should we move this method so some internal utils? At first glance, `toPhysicalRowDataType` vs `toPersistedRowDataType` confuses me a lot. 
   
   Or we can add `The returned row data type contains additional persisted metadata columns than {@link toPhysicalRowDataType()}` to the Note.

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
##########
@@ -76,6 +76,27 @@ class TableScanTest extends TableTestBase {
     util.verifyPlan("SELECT * FROM t1")

Review comment:
       Could you add plan tests for projection pushdown and metadata pushdown?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableColumn.java
##########
@@ -270,6 +285,11 @@ public boolean isPhysical() {
 			return false;
 		}
 
+		@Override
+		public boolean isPersisted() {
+			return !isVirtual;
+		}
+
 		@Override
 		public Optional<String> explainExtras() {
 			final StringBuilder sb = new StringBuilder();

Review comment:
       Add a single quote around the `metadataAlias`? 

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
##########
@@ -306,4 +301,101 @@ class TableSinkTest extends TableTestBase {
     util.verifyPlan(stmtSet, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testExceptionForWritingVirtualMetadataColumn(): Unit = {
+    // test reordering, skipping, casting of (virtual) metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `m_3` INT METADATA FROM 'metadata_3' VIRTUAL,
+         |  `m_2` INT METADATA FROM 'metadata_2',
+         |  `b` BIGINT,
+         |  `c` INT,
+         |  `metadata_1` STRING METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT, metadata_3:BIGINT',
+         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BIGINT'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT *
+        |FROM MetadataTable
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Query schema: [a: INT, m_3: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]\n" +
+      "Sink schema:  [a: INT, m_2: INT, b: BIGINT, c: INT, metadata_1: STRING]")
+
+    util.verifyPlan(stmtSet)
+  }
+
+  @Test
+  def testExceptionForWritingInvalidMetadataColumn(): Unit = {
+    // test casting of metadata columns
+    util.addTable(
+      s"""
+         |CREATE TABLE MetadataTable (
+         |  `a` INT,
+         |  `metadata_1` TIMESTAMP(3) METADATA
+         |) WITH (
+         |  'connector' = 'values',
+         |  'writable-metadata' = 'metadata_1:BOOLEAN'
+         |)
+       """.stripMargin)
+
+    val sql =
+      """
+        |INSERT INTO MetadataTable
+        |SELECT TIMESTAMP '1990-10-14 06:00:00.000'
+        |""".stripMargin
+    val stmtSet = util.tableEnv.createStatementSet()
+    stmtSet.addInsertSql(sql)
+
+    thrown.expect(classOf[ValidationException])
+    thrown.expectMessage(
+      "Invalid data type for metadata key 'metadata_1' in column 'metadata_1' of table " +
+      "'default_catalog.default_database.MetadataTable'. The declared type 'TIMESTAMP(3)' cannot " +
+      "be cast to consumed metadata type 'BOOLEAN'.")

Review comment:
       It looks confusing what is the "consumed" mean for users. Can we say something like `The 'metadata_1' metadata doesn't support to be declared as 'TIMESTAMP(3)', but you can declare it as types castable with 'BOOLEAN'.` ?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/DynamicSinkUtils.java
##########
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.sinks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.TableColumn.MetadataColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.operations.CatalogSinkModifyOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
+import org.apache.flink.table.planner.plan.schema.CatalogSourceTable;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.TypeTransformations;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsAvoidingCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsExplicitCast;
+import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
+
+/**
+ * Utilities for dealing with {@link DynamicTableSink}.
+ */
+@Internal
+public final class DynamicSinkUtils {
+
+	/**
+	 * Similar to {@link CatalogSourceTable#toRel(RelOptTable.ToRelContext)}, converts a given
+	 * {@link DynamicTableSink} to a {@link RelNode}. It adds helper projections if necessary.
+	 */
+	public static RelNode toRel(
+			FlinkRelBuilder relBuilder,
+			RelNode input,
+			CatalogSinkModifyOperation sinkOperation,
+			DynamicTableSink sink,
+			CatalogTable table) {
+		final FlinkTypeFactory typeFactory = ShortcutUtils.unwrapTypeFactory(relBuilder);
+		final TableSchema schema = table.getSchema();
+
+		// 1. prepare table sink
+		prepareDynamicSink(sinkOperation, sink, table);
+
+		// 2. validate the query schema to the sink's table schema and apply cast if possible
+		final RelNode query = validateSchemaAndApplyImplicitCast(
+			input,
+			schema,
+			sinkOperation.getTableIdentifier(),
+			typeFactory);
+		relBuilder.push(query);
+
+		// 3. convert the sink's table schema to the consumed data type of the sink
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+		if (!metadataColumns.isEmpty()) {
+			pushMetadataProjection(relBuilder, typeFactory, schema, sink);
+		}
+
+		final RelNode finalQuery = relBuilder.build();
+
+		return LogicalSink.create(
+			finalQuery,
+			sinkOperation.getTableIdentifier(),
+			table,
+			sink,
+			sinkOperation.getStaticPartitions());
+	}
+
+	/**
+	 * Checks if the given query can be written into the given sink's table schema.
+	 *
+	 * <p>It checks whether field types are compatible (types should be equal including precisions).
+	 * If types are not compatible, but can be implicitly cast, a cast projection will be applied.
+	 * Otherwise, an exception will be thrown.
+	 */
+	public static RelNode validateSchemaAndApplyImplicitCast(
+			RelNode query,
+			TableSchema sinkSchema,
+			@Nullable ObjectIdentifier sinkIdentifier,
+			FlinkTypeFactory typeFactory) {
+		final RowType queryType = FlinkTypeFactory.toLogicalRowType(query.getRowType());
+		final List<RowField> queryFields = queryType.getFields();
+
+		final RowType sinkType = (RowType) fixSinkDataType(sinkSchema.toPersistedRowDataType()).getLogicalType();
+		final List<RowField> sinkFields = sinkType.getFields();
+
+		if (queryFields.size() != sinkFields.size()) {
+			throw createSchemaMismatchException(
+				"Different number of columns.",
+				sinkIdentifier,
+				queryFields,
+				sinkFields);
+		}
+
+		boolean requiresCasting = false;
+		for (int i = 0; i < sinkFields.size(); i++) {
+			final LogicalType queryColumnType = queryFields.get(i).getType();
+			final LogicalType sinkColumnType = sinkFields.get(i).getType();
+			if (!supportsImplicitCast(queryColumnType, sinkColumnType)) {
+				throw createSchemaMismatchException(
+					String.format(
+						"Incompatible types for sink column '%s' at position %s.",
+						sinkFields.get(i).getName(),
+						i),
+					sinkIdentifier,
+					queryFields,
+					sinkFields);
+			}
+			if (!supportsAvoidingCast(queryColumnType, sinkColumnType)) {
+				requiresCasting = true;
+			}
+		}
+
+		if (requiresCasting) {
+			final RelDataType castRelDataType = typeFactory.buildRelNodeRowType(sinkType);
+			return RelOptUtil.createCastRel(query, castRelDataType, true);
+		}
+		return query;
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a projection that reorders physical and metadata columns according to the consumed data
+	 * type of the sink. It casts metadata columns into the expected data type.
+	 *
+	 * @see SupportsWritingMetadata
+	 */
+	private static void pushMetadataProjection(
+			FlinkRelBuilder relBuilder,
+			FlinkTypeFactory typeFactory,
+			TableSchema schema,
+			DynamicTableSink sink) {
+		final RexBuilder rexBuilder = relBuilder.getRexBuilder();
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+
+		final List<Integer> physicalColumns = extractPhysicalColumns(schema);
+
+		final Map<String, Integer> keyToMetadataColumn = extractPersistedMetadataColumns(schema)
+			.stream()
+			.collect(
+				Collectors.toMap(
+					pos -> {
+						final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+						return metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+					},
+					Function.identity()
+				)
+			);
+
+		final List<Integer> metadataColumns = createRequiredMetadataKeys(schema, sink)
+			.stream()
+			.map(keyToMetadataColumn::get)
+			.collect(Collectors.toList());
+
+		final List<String> fieldNames = Stream
+			.concat(
+				physicalColumns.stream()
+					.map(tableColumns::get)
+					.map(TableColumn::getName),
+				metadataColumns.stream()
+					.map(tableColumns::get)
+					.map(MetadataColumn.class::cast)
+					.map(c -> c.getMetadataAlias().orElse(c.getName()))
+			)
+			.collect(Collectors.toList());
+
+		final Map<String, DataType> metadataMap = extractMetadataMap(sink);
+
+		final List<RexNode> fieldNodes = Stream
+			.concat(
+				physicalColumns
+					.stream()
+					.map(pos -> {
+						final int posAdjusted = adjustByVirtualColumns(tableColumns, pos);
+						return relBuilder.field(posAdjusted);
+					}),
+				metadataColumns
+					.stream()
+					.map(pos -> {
+						final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+						final String metadataKey = metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+
+						final LogicalType expectedType = metadataMap.get(metadataKey).getLogicalType();
+						final RelDataType expectedRelDataType = typeFactory.createFieldTypeFromLogicalType(expectedType);
+
+						final int posAdjusted = adjustByVirtualColumns(tableColumns, pos);
+						return rexBuilder.makeAbstractCast(expectedRelDataType, relBuilder.field(posAdjusted));
+					})
+			)
+			.collect(Collectors.toList());
+
+		relBuilder.projectNamed(fieldNodes, fieldNames, true);
+	}
+
+	/**
+	 * Prepares the given {@link DynamicTableSink}. It check whether the sink is compatible with the
+	 * INSERT INTO clause and applies initial parameters.
+	 */
+	private static void prepareDynamicSink(
+			CatalogSinkModifyOperation sinkOperation,
+			DynamicTableSink sink,
+			CatalogTable table) {
+		final ObjectIdentifier sinkIdentifier = sinkOperation.getTableIdentifier();
+
+		validatePartitioning(sinkOperation, sinkIdentifier, sink, table.getPartitionKeys());
+
+		validateAndApplyOverwrite(sinkOperation, sinkIdentifier, sink);
+
+		validateAndApplyMetadata(sinkIdentifier, sink, table.getSchema());
+	}
+
+	/**
+	 * Returns a list of required metadata keys. Ordered by the iteration order of
+	 * {@link SupportsWritingMetadata#listWritableMetadata()}.
+	 *
+	 * <p>This method assumes that sink and schema have been validated via
+	 * {@link #prepareDynamicSink(CatalogSinkModifyOperation, DynamicTableSink, CatalogTable)}.
+	 */
+	private static List<String> createRequiredMetadataKeys(TableSchema schema, DynamicTableSink sink) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+
+		final Set<String> requiredMetadataKeys = metadataColumns
+			.stream()
+			.map(tableColumns::get)
+			.map(MetadataColumn.class::cast)
+			.map(c -> c.getMetadataAlias().orElse(c.getName()))
+			.collect(Collectors.toSet());
+
+		final Map<String, DataType> metadataMap = extractMetadataMap(sink);
+
+		return metadataMap.keySet()
+			.stream()
+			.filter(requiredMetadataKeys::contains)
+			.collect(Collectors.toList());
+	}
+
+	private static ValidationException createSchemaMismatchException(
+			String cause,
+			@Nullable ObjectIdentifier sinkIdentifier,
+			List<RowField> queryFields,
+			List<RowField> sinkFields) {
+		final String querySchema = queryFields.stream()
+			.map(f -> f.getName() + ": " + f.getType().asSummaryString())
+			.collect(Collectors.joining(", ", "[", "]"));
+		final String sinkSchema = sinkFields.stream()
+			.map(sinkField -> sinkField.getName() + ": " + sinkField.getType().asSummaryString())
+			.collect(Collectors.joining(", ", "[", "]"));
+		final String tableName;
+		if (sinkIdentifier != null) {
+			tableName = "registered table '" + sinkIdentifier.asSummaryString() + "'";
+		} else {
+			tableName = "unregistered table";
+		}
+
+		return new ValidationException(
+			String.format(
+				"Column types of query result and sink for %s do not match.\n" +
+					"Cause: %s\n\n" +
+					"Query schema: %s\n" +
+					"Sink schema:  %s",
+				tableName,
+				cause,
+				querySchema,
+				sinkSchema
+			)
+		);
+	}
+
+	private static DataType fixSinkDataType(DataType sinkDataType) {
+		// we recognize legacy decimal is the same to default decimal
+		// we ignore NULL constraint, the NULL constraint will be checked during runtime
+		// see StreamExecSink and BatchExecSink
+		return DataTypeUtils.transform(
+			sinkDataType,
+			TypeTransformations.legacyDecimalToDefaultDecimal(),
+			TypeTransformations.legacyRawToTypeInfoRaw(),
+			TypeTransformations.toNullable());
+	}
+
+	private static void validatePartitioning(
+			CatalogSinkModifyOperation sinkOperation,
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink,
+			List<String> partitionKeys) {
+		if (!partitionKeys.isEmpty()) {
+			if (!(sink instanceof SupportsPartitioning)) {
+				throw new TableException(
+					String.format(
+						"Table '%s' is a partitioned table, but the underlying %s doesn't " +
+							"implement the %s interface.",
+						sinkIdentifier.asSummaryString(),
+						DynamicTableSink.class.getSimpleName(),
+						SupportsPartitioning.class.getSimpleName()
+					)
+				);
+			}
+		}
+
+		final Map<String, String> staticPartitions = sinkOperation.getStaticPartitions();
+		staticPartitions.keySet().forEach(p -> {
+			if (!partitionKeys.contains(p)) {
+				throw new ValidationException(
+					String.format(
+						"Static partition column '%s' should be in the partition keys list %s for table '%s'.",
+						p,
+						partitionKeys,
+						sinkIdentifier.asSummaryString()
+					)
+				);
+			}
+		});
+	}
+
+	private static void validateAndApplyOverwrite(
+			CatalogSinkModifyOperation sinkOperation,
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink) {
+		if (!sinkOperation.isOverwrite()) {
+			return;
+		}
+		if (!(sink instanceof SupportsOverwrite)) {
+			throw new ValidationException(
+				String.format(
+					"INSERT OVERWRITE requires that the underlying %s of table '%s' " +
+						"implements the %s interface.",
+					DynamicTableSink.class.getSimpleName(),
+					sinkIdentifier.asSummaryString(),
+					SupportsOverwrite.class.getSimpleName()
+				)
+			);
+		}
+		final SupportsOverwrite overwriteSink = (SupportsOverwrite) sink;
+		overwriteSink.applyOverwrite(sinkOperation.isOverwrite());
+	}
+
+	private static List<Integer> extractPhysicalColumns(TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		return IntStream.range(0, schema.getFieldCount())
+			.filter(pos -> tableColumns.get(pos).isPhysical())
+			.boxed()
+			.collect(Collectors.toList());
+	}
+
+	private static List<Integer> extractPersistedMetadataColumns(TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		return IntStream.range(0, schema.getFieldCount())
+			.filter(pos -> {
+				final TableColumn tableColumn = tableColumns.get(pos);
+				return tableColumn instanceof MetadataColumn && tableColumn.isPersisted();
+			})
+			.boxed()
+			.collect(Collectors.toList());
+	}
+
+	private static int adjustByVirtualColumns(List<TableColumn> tableColumns, int pos) {
+		return pos - (int) IntStream.range(0, pos).filter(i -> !tableColumns.get(i).isPersisted()).count();
+	}
+
+	private static Map<String, DataType> extractMetadataMap(DynamicTableSink sink) {
+		if (sink instanceof SupportsWritingMetadata) {
+			return ((SupportsWritingMetadata) sink).listWritableMetadata();
+		}
+		return Collections.emptyMap();
+	}
+
+	private static void validateAndApplyMetadata(
+			ObjectIdentifier sinkIdentifier,
+			DynamicTableSink sink,
+			TableSchema schema) {
+		final List<TableColumn> tableColumns = schema.getTableColumns();
+		final List<Integer> metadataColumns = extractPersistedMetadataColumns(schema);
+
+		if (metadataColumns.isEmpty()) {
+			return;
+		}
+
+		if (!(sink instanceof SupportsWritingMetadata)) {
+			throw new ValidationException(
+				String.format(
+					"Table '%s' declares persistable metadata columns, but the underlying %s " +
+						"doesn't implement the %s interface. If the column should not " +
+						"be persisted, it can be declared with the VIRTUAL keyword.",
+					sinkIdentifier.asSummaryString(),
+					DynamicTableSink.class.getSimpleName(),
+					SupportsWritingMetadata.class.getSimpleName()
+				)
+			);
+		}
+
+		final SupportsWritingMetadata metadataSink = (SupportsWritingMetadata) sink;
+
+		final Map<String, DataType> metadataMap = ((SupportsWritingMetadata) sink).listWritableMetadata();
+		metadataColumns.forEach(pos -> {
+			final MetadataColumn metadataColumn = (MetadataColumn) tableColumns.get(pos);
+			final String metadataKey = metadataColumn.getMetadataAlias().orElse(metadataColumn.getName());
+			final LogicalType metadataType = metadataColumn.getType().getLogicalType();
+			final DataType expectedMetadataDataType = metadataMap.get(metadataKey);
+			// check that metadata key is valid
+			if (expectedMetadataDataType == null) {
+				throw new ValidationException(
+					String.format(
+						"Invalid metadata key '%s' in column '%s' of table '%s'. " +
+							"The %s class '%s' supports the following metadata keys for writing:\n%s",
+						metadataKey,
+						metadataColumn.getName(),
+						sinkIdentifier.asSummaryString(),
+						DynamicTableSink.class.getSimpleName(),
+						sink.getClass().getName(),
+						String.join("\n", metadataMap.keySet())
+					)
+				);
+			}
+			// check that types are compatible
+			if (!supportsExplicitCast(metadataType, expectedMetadataDataType.getLogicalType())) {
+				throw new ValidationException(
+					String.format(
+						"Invalid data type for metadata key '%s' in column '%s' of table '%s'. " +
+							"The declared type '%s' cannot be cast to consumed metadata type '%s'.",
+						metadataKey,
+						metadataColumn.getName(),
+						sinkIdentifier.asSummaryString(),
+						metadataType,
+						expectedMetadataDataType.getLogicalType()
+					)
+				);
+			}
+		});
+
+		metadataSink.applyWritableMetadata(
+			createRequiredMetadataKeys(schema, sink),
+			TypeConversions.fromLogicalToDataType(createConsumedType(schema, sink)));
+	}
+
+	/**
+	 * Returns the {@link DataType} that a sink should consume as the output from the runtime.
+	 *
+	 * <p>The format looks as follows: {@code PHYSICAL COLUMNS + METADATA COLUMNS}

Review comment:
       ```suggestion
   	 * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED METADATA COLUMNS}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   * 4922a794b8792bcd5877ac7d08fc8c1bb82108aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13618:
URL: https://github.com/apache/flink/pull/13618#discussion_r510059657



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
##########
@@ -76,6 +76,27 @@ class TableScanTest extends TableTestBase {
     util.verifyPlan("SELECT * FROM t1")

Review comment:
       Very good point! I even found a bug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   * 4922a794b8792bcd5877ac7d08fc8c1bb82108aa UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13618:
URL: https://github.com/apache/flink/pull/13618#discussion_r509437891



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -262,6 +266,25 @@ public DataType toPhysicalRowDataType() {
 		return ROW(fields);
 	}
 
+	/**
+	 * Converts all persisted columns of this schema into a (possibly nested) row data type.
+	 *
+	 * <p>This method returns the query-to-sink schema.
+	 *
+	 * <p>Note: Computed columns and virtual columns are excluded in the returned row data type.
+	 *
+	 * @see DataTypes#ROW(Field...)
+	 * @see #toRowDataType()
+	 * @see #toPhysicalRowDataType()
+	 */
+	public DataType toPersistedRowDataType() {

Review comment:
       I thought we should give users a method to show the `query-to-sink` schema. It is not used for connectors and also not very useful in the planner. That's why it is in a separate commit. I can remove it again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106",
       "triggerID" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4922a794b8792bcd5877ac7d08fc8c1bb82108aa Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099) 
   * 722efcead1a2e9d3fa3c11d8c6685850f7c5def6 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707799718


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit ad4d51bd6104da7e25c51973aaef9b45056907a4 (Tue Oct 13 14:59:13 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106",
       "triggerID" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 722efcead1a2e9d3fa3c11d8c6685850f7c5def6 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8106) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13618: [FLINK-19274][table] Implement SupportsReadingMetadata and SupportsWritingMetadata for sources and sinks

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13618:
URL: https://github.com/apache/flink/pull/13618#issuecomment-707809605


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544",
       "triggerID" : "ad4d51bd6104da7e25c51973aaef9b45056907a4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099",
       "triggerID" : "4922a794b8792bcd5877ac7d08fc8c1bb82108aa",
       "triggerType" : "PUSH"
     }, {
       "hash" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "722efcead1a2e9d3fa3c11d8c6685850f7c5def6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ad4d51bd6104da7e25c51973aaef9b45056907a4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7544) 
   * 4922a794b8792bcd5877ac7d08fc8c1bb82108aa Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8099) 
   * 722efcead1a2e9d3fa3c11d8c6685850f7c5def6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org