You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/21 16:32:53 UTC

[GitHub] [flink] twalthr opened a new pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

twalthr opened a new pull request #13732:
URL: https://github.com/apache/flink/pull/13732


   ## What is the purpose of the change
   
   This updates the `KafkaDynamicSource` and `KafkaDynamicSink` to read and
   write metadata according to FLIP-107. Reading and writing metadata of formats
   is not supported yet.
   
   This PR is based on #13618.
   
   ## Brief change log
   
   - Implement `SupportsReadingMetadata` and `SupportsWritingMetadata` in source and sink
   - Refactor Kafka source and sink accordingly
   - Various minor improvements (see separate commits)
   
   ## Verifying this change
   
   - Added a test in `KafkaTableITCase`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f19217861e90c324e6c02a1fde7b705381b72d8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510231299



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       I see.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713703980


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 3f19217861e90c324e6c02a1fde7b705381b72d8 (Wed Oct 21 16:35:57 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510144202



##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       I might not understand your comment here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r509926611



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
##########
@@ -140,7 +140,7 @@
 		 *
 		 * @see TableSchema#toPhysicalRowDataType()
 		 */
-		TypeInformation<?> createTypeInformation(DataType consumedDataType);
+		<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);

Review comment:
       This is a nice improvement. But is this a compatible change?

##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
+	 * with this implementation in future versions.
+	 */
+	public static boolean deepEquals(Row row, Object other) {
+		return deepEqualsInternal(row, other);
+	}
+
+	/**
+	 * Compares two {@link List}s of {@link Row} for deep equality. This method supports all conversion
+	 * classes of the table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
+	 * with this implementation in future versions.
+	 */
+	public static boolean deepEquals(List<Row> l1, List<Row> l2) {
+		return deepEqualsInternal(l1, l2);
+	}
+
+	private static boolean deepEqualsInternal(Object o1, Object o2) {
+		if (o1 == o2) {
+			return true;
+		} else if (o1 == null || o2 == null) {
+			return false;
+		} else if (o1 instanceof Row && o2 instanceof Row) {
+			return deepEqualsRow((Row) o1, (Row) o2);
+		} else if (o1 instanceof Object[] && o2 instanceof Object[]) {
+			return deepEqualsArray((Object[]) o1, (Object[]) o2);
+		} else if (o1 instanceof Map && o2 instanceof Map) {
+			return deepEqualsMap((Map<?, ?>) o1, (Map<?, ?>) o2);
+		} else if (o1 instanceof List && o2 instanceof List) {
+			return deepEqualsList((List<?>) o1, (List<?>) o2);
+		}
+		return Objects.deepEquals(o1, o2);
+	}
+
+	private static boolean deepEqualsRow(Row row1, Row row2) {
+		if (row1.getKind() != row2.getKind()) {
+			return false;
+		}
+		if (row1.getArity() != row2.getArity()) {
+			return false;
+		}
+		for (int pos = 0; pos < row1.getArity(); pos++) {
+			final Object f1 = row1.getField(pos);
+			final Object f2 = row2.getField(pos);
+			if (!deepEqualsInternal(f1, f2)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static boolean deepEqualsArray(Object[] a1, Object[] a2) {
+		if (a1.getClass() != a2.getClass()) {
+			return false;
+		}
+		if (a1.length != a2.length) {
+			return false;
+		}
+		for (int pos = 0; pos < a1.length; pos++) {
+			final Object e1 = a1[pos];
+			final Object e2 = a2[pos];
+			if (!deepEqualsInternal(e1, e2)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static <K, V> boolean deepEqualsMap(Map<K, V> m1, Map<?, ?> m2) {
+		// copied from HashMap.equals but with deepEquals comparision
+		if (m1.size() != m2.size()) {
+			return false;
+		}
+		try {
+			for (Map.Entry<K, V> e : m1.entrySet()) {
+				K key = e.getKey();
+				V value = e.getValue();
+				if (value == null) {
+					if (!(m2.get(key) == null && m2.containsKey(key))) {
+						return false;
+					}
+				} else {
+					if (!deepEqualsInternal(value, m2.get(key))) {
+						return false;
+					}
+				}
+			}
+		} catch (ClassCastException | NullPointerException unused) {
+			return false;
+		}
+		return true;
+	}
+
+	private static <E> boolean deepEqualsList(List<E> l1, List<?> l2) {
+		final Iterator<E> i1 = l1.iterator();
+		final Iterator<?> i2 = l2.iterator();
+		while (i1.hasNext() && i2.hasNext()) {

Review comment:
       Why not compare size first? 

##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       It seems that we have already support this? 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -80,23 +121,39 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
 
 	@Override
 	public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-		final SerializationSchema<RowData> serializationSchema =
-				this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType);
+		final SerializationSchema<RowData> valueSerialization =
+				this.encodingFormat.createRuntimeEncoder(context, this.physicalDataType);
 
-		final FlinkKafkaProducer<RowData> kafkaProducer = createKafkaProducer(serializationSchema);
+		final FlinkKafkaProducer<RowData> kafkaProducer = createKafkaProducer(valueSerialization);
 
 		return SinkFunctionProvider.of(kafkaProducer);
 	}
 
+	@Override
+	public Map<String, DataType> listWritableMetadata() {
+		final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+		Stream.of(WritableMetadata.values()).forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+		return metadataMap;
+	}
+
+	@Override
+	public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
+		this.consumedDataType = consumedDataType;

Review comment:
       Could we remove the `consumedDataType` as it is never used?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",
+			DataTypes.INT().nullable(),
+			record -> record.leaderEpoch().orElse(null)
+		),
+
+		OFFSET(
+			"offset",
+			DataTypes.BIGINT().notNull(),
+			ConsumerRecord::offset),
+
+		TIMESTAMP(
+			"timestamp",
+			DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+			record -> TimestampData.fromEpochMillis(record.timestamp())),
+
+		TIMESTAMP_TYPE(
+			"timestamp-type",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.timestampType().toString())
+		);
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class MetadataKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {

Review comment:
       Declare `serialVersionUID` for this class. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -134,13 +195,233 @@ public int hashCode() {
 
 	// --------------------------------------------------------------------------------------------
 
-	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> serializationSchema) {
+	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> valueSerialization) {
+		final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+
+		final RowData.FieldGetter[] physicalFieldGetters = IntStream.range(0, physicalChildren.size())
+				.mapToObj(pos -> RowData.createFieldGetter(physicalChildren.get(pos), pos))
+				.toArray(RowData.FieldGetter[]::new);
+
+		// determine the positions of metadata in the consumed row
+		final int[] metadataPositions = Stream.of(WritableMetadata.values())
+				.mapToInt(m -> {
+					final int pos = metadataKeys.indexOf(m.key);
+					if (pos < 0) {
+						return -1;
+					}
+					return physicalChildren.size() + pos;
+				})
+				.toArray();
+
+		final MetadataKafkaSerializationSchema kafkaSerializer = new MetadataKafkaSerializationSchema(
+				topic,
+				partitioner.orElse(null),
+				valueSerialization,
+				metadataPositions,
+				physicalFieldGetters);
+
 		return new FlinkKafkaProducer<>(
 			topic,
-			serializationSchema,
+			kafkaSerializer,
 			properties,
-			partitioner.orElse(null),
 			FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
 			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum WritableMetadata {
+
+		HEADERS(
+				"headers",
+				// key and value of the map are nullable to make handling easier in queries
+				DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					final MapData map = row.getMap(pos);
+					final ArrayData keyArray = map.keyArray();
+					final ArrayData valueArray = map.valueArray();
+					final List<Header> headers = new ArrayList<>();
+					for (int i = 0; i < keyArray.size(); i++) {
+						if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+							final String key = keyArray.getString(i).toString();
+							final byte[] value = valueArray.getBinary(i);
+							headers.add(new KafkaHeader(key, value));
+						}
+					}
+					return headers;
+				}
+		),
+
+		TIMESTAMP(
+				"timestamp",
+				DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					return row.getTimestamp(pos, 3).getMillisecond();
+				});
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		WritableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	private static class MetadataKafkaSerializationSchema

Review comment:
       Could we move this class out of `KafkaDynamicSink` ? I believe this class would be very big in the near future when we support key/value format. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       I just find if we define metadata key with dash `-` separator, it has to be declared using `FROM` clause or escaped, e.g. `leader_epoch INT FROM 'leader-epoch'`. What do you think about changing the key to `leader_epoch` which is more SQL identifier compliant. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -134,13 +195,233 @@ public int hashCode() {
 
 	// --------------------------------------------------------------------------------------------
 
-	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> serializationSchema) {
+	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> valueSerialization) {
+		final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+
+		final RowData.FieldGetter[] physicalFieldGetters = IntStream.range(0, physicalChildren.size())
+				.mapToObj(pos -> RowData.createFieldGetter(physicalChildren.get(pos), pos))
+				.toArray(RowData.FieldGetter[]::new);
+
+		// determine the positions of metadata in the consumed row
+		final int[] metadataPositions = Stream.of(WritableMetadata.values())
+				.mapToInt(m -> {
+					final int pos = metadataKeys.indexOf(m.key);
+					if (pos < 0) {
+						return -1;
+					}
+					return physicalChildren.size() + pos;
+				})
+				.toArray();
+
+		final MetadataKafkaSerializationSchema kafkaSerializer = new MetadataKafkaSerializationSchema(
+				topic,
+				partitioner.orElse(null),
+				valueSerialization,
+				metadataPositions,
+				physicalFieldGetters);
+
 		return new FlinkKafkaProducer<>(
 			topic,
-			serializationSchema,
+			kafkaSerializer,
 			properties,
-			partitioner.orElse(null),
 			FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
 			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum WritableMetadata {
+
+		HEADERS(
+				"headers",
+				// key and value of the map are nullable to make handling easier in queries
+				DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					final MapData map = row.getMap(pos);
+					final ArrayData keyArray = map.keyArray();
+					final ArrayData valueArray = map.valueArray();
+					final List<Header> headers = new ArrayList<>();
+					for (int i = 0; i < keyArray.size(); i++) {
+						if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+							final String key = keyArray.getString(i).toString();
+							final byte[] value = valueArray.getBinary(i);
+							headers.add(new KafkaHeader(key, value));
+						}
+					}
+					return headers;
+				}
+		),
+
+		TIMESTAMP(
+				"timestamp",
+				DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					return row.getTimestamp(pos, 3).getMillisecond();
+				});
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		WritableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	private static class MetadataKafkaSerializationSchema
+			implements KafkaSerializationSchema<RowData>, KafkaContextAware<RowData> {
+
+		private final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+		private final String topic;
+
+		private final SerializationSchema<RowData> valueSerialization;
+
+		/**
+		 * Contains the position for each value of {@link WritableMetadata} in the consumed row or
+		 * -1 if this metadata key is not used.
+		 */
+		private final int[] metadataPositions;
+
+		private final RowData.FieldGetter[] physicalFieldGetters;
+
+		private int[] partitions;
+
+		private int parallelInstanceId;
+
+		private int numParallelInstances;
+
+		MetadataKafkaSerializationSchema(
+				String topic,
+				@Nullable FlinkKafkaPartitioner<RowData> partitioner,
+				SerializationSchema<RowData> valueSerialization,
+				int[] metadataPositions,
+				RowData.FieldGetter[] physicalFieldGetters) {
+			this.topic = topic;
+			this.partitioner = partitioner;
+			this.valueSerialization = valueSerialization;
+			this.metadataPositions = metadataPositions;
+			this.physicalFieldGetters = physicalFieldGetters;
+		}
+
+		@Override
+		public void open(SerializationSchema.InitializationContext context) throws Exception {
+			valueSerialization.open(context);
+			if (partitioner != null) {
+				partitioner.open(parallelInstanceId, numParallelInstances);
+			}
+		}
+
+		@Override
+		public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
+			final int physicalArity = physicalFieldGetters.length;
+			final boolean hasMetadata = physicalArity != consumedRow.getArity();

Review comment:
       `hasMetadata` can be determined by `metadataPositions`  in constructor to allow JIT compiler optimization.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -134,13 +195,233 @@ public int hashCode() {
 
 	// --------------------------------------------------------------------------------------------
 
-	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> serializationSchema) {
+	protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> valueSerialization) {
+		final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
+
+		final RowData.FieldGetter[] physicalFieldGetters = IntStream.range(0, physicalChildren.size())
+				.mapToObj(pos -> RowData.createFieldGetter(physicalChildren.get(pos), pos))
+				.toArray(RowData.FieldGetter[]::new);
+
+		// determine the positions of metadata in the consumed row
+		final int[] metadataPositions = Stream.of(WritableMetadata.values())
+				.mapToInt(m -> {
+					final int pos = metadataKeys.indexOf(m.key);
+					if (pos < 0) {
+						return -1;
+					}
+					return physicalChildren.size() + pos;
+				})
+				.toArray();
+
+		final MetadataKafkaSerializationSchema kafkaSerializer = new MetadataKafkaSerializationSchema(
+				topic,
+				partitioner.orElse(null),
+				valueSerialization,
+				metadataPositions,
+				physicalFieldGetters);
+
 		return new FlinkKafkaProducer<>(
 			topic,
-			serializationSchema,
+			kafkaSerializer,
 			properties,
-			partitioner.orElse(null),
 			FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
 			FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum WritableMetadata {
+
+		HEADERS(
+				"headers",
+				// key and value of the map are nullable to make handling easier in queries
+				DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					final MapData map = row.getMap(pos);
+					final ArrayData keyArray = map.keyArray();
+					final ArrayData valueArray = map.valueArray();
+					final List<Header> headers = new ArrayList<>();
+					for (int i = 0; i < keyArray.size(); i++) {
+						if (!keyArray.isNullAt(i) && !valueArray.isNullAt(i)) {
+							final String key = keyArray.getString(i).toString();
+							final byte[] value = valueArray.getBinary(i);
+							headers.add(new KafkaHeader(key, value));
+						}
+					}
+					return headers;
+				}
+		),
+
+		TIMESTAMP(
+				"timestamp",
+				DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+				(row, pos) -> {
+					if (row.isNullAt(pos)) {
+						return null;
+					}
+					return row.getTimestamp(pos, 3).getMillisecond();
+				});
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		WritableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	private static class MetadataKafkaSerializationSchema

Review comment:
       Add `serialVersionUID` to this class.
   

##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
+	 * with this implementation in future versions.
+	 */
+	public static boolean deepEquals(Row row, Object other) {

Review comment:
       The `other` should also be `Row` class? Because the Javadoc says "Compares two Rows".

##########
File path: flink-core/src/test/java/org/apache/flink/types/RowTest.java
##########
@@ -103,4 +110,84 @@ public void testRowJoin() {
 		expected.setField(4, "hello world");
 		assertEquals(expected, joinedRow);
 	}
+
+	@Test
+	public void testDeepEquals() {
+		final Map<String, byte[]> originalMap = new HashMap<>();
+		originalMap.put("k1", new byte[]{1, 2, 3});
+		originalMap.put("k2", new byte[]{3, 4, 6});
+
+		final Row originalRow = Row.of(
+				RowKind.INSERT,
+				true,
+				new Integer[]{1, null, 3},
+				Arrays.asList(1, null, 3),
+				originalMap,
+				Collections.emptyMap(),
+				new int[][]{{1, 2, 3}, {}, {4, 5}},
+				1.44
+		);
+		assertTrue(Row.deepEquals(originalRow, originalRow));
+
+		{
+			// no diff
+			final Row row = Row.of(
+					RowKind.INSERT,
+					true,
+					new Integer[]{1, null, 3},
+					Arrays.asList(1, null, 3),
+					originalMap,
+					Collections.emptyMap(),
+					new int[][]{{1, 2, 3}, {}, {4, 5}},
+					1.44
+			);
+			assertTrue(Row.deepEquals(row, originalRow));
+		}
+
+		{
+			final Row row = Row.of(
+					RowKind.INSERT,
+					true,
+					new Integer[]{1, null, 3, 99}, // diff here
+					Arrays.asList(1, null, 3),
+					originalMap,

Review comment:
       This is a same map reference, create another map?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",
+			DataTypes.INT().nullable(),
+			record -> record.leaderEpoch().orElse(null)
+		),
+
+		OFFSET(
+			"offset",
+			DataTypes.BIGINT().notNull(),
+			ConsumerRecord::offset),
+
+		TIMESTAMP(
+			"timestamp",
+			DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+			record -> TimestampData.fromEpochMillis(record.timestamp())),
+
+		TIMESTAMP_TYPE(
+			"timestamp-type",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.timestampType().toString())
+		);
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class MetadataKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+		private final DeserializationSchema<RowData> valueDeserialization;
+
+		private final MetadataAppendingCollector metadataAppendingCollector;
+
+		private final TypeInformation<RowData> producedTypeInfo;
+
+		MetadataKafkaDeserializationSchema(
+				DeserializationSchema<RowData> valueDeserialization,
+				MetadataConverter[] metadataConverters,
+				TypeInformation<RowData> producedTypeInfo) {
+			this.valueDeserialization = valueDeserialization;
+			this.metadataAppendingCollector = new MetadataAppendingCollector(metadataConverters);
+			this.producedTypeInfo = producedTypeInfo;
+		}
+
+		@Override
+		public void open(DeserializationSchema.InitializationContext context) throws Exception {
+			valueDeserialization.open(context);
+		}
+
+		@Override
+		public boolean isEndOfStream(RowData nextElement) {
+			return false;
+		}
+
+		@Override
+		public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
+			throw new IllegalStateException("A collector is required for deserializing.");
+		}
+
+		@Override
+		public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
+			metadataAppendingCollector.inputRecord = record;
+			metadataAppendingCollector.outputCollector = collector;
+			valueDeserialization.deserialize(record.value(), metadataAppendingCollector);
+		}
+
+		@Override
+		public TypeInformation<RowData> getProducedType() {
+			return producedTypeInfo;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {

Review comment:
       Declare serialVersionUID for this class.
   
   

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",
+			DataTypes.INT().nullable(),
+			record -> record.leaderEpoch().orElse(null)
+		),
+
+		OFFSET(
+			"offset",
+			DataTypes.BIGINT().notNull(),
+			ConsumerRecord::offset),
+
+		TIMESTAMP(
+			"timestamp",
+			DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+			record -> TimestampData.fromEpochMillis(record.timestamp())),
+
+		TIMESTAMP_TYPE(
+			"timestamp-type",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.timestampType().toString())
+		);
+
+		final String key;
+
+		final DataType dataType;
+
+		final MetadataConverter converter;
+
+		ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
+			this.key = key;
+			this.dataType = dataType;
+			this.converter = converter;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static class MetadataKafkaDeserializationSchema implements KafkaDeserializationSchema<RowData> {
+
+		private final DeserializationSchema<RowData> valueDeserialization;
+
+		private final MetadataAppendingCollector metadataAppendingCollector;
+
+		private final TypeInformation<RowData> producedTypeInfo;
+
+		MetadataKafkaDeserializationSchema(
+				DeserializationSchema<RowData> valueDeserialization,
+				MetadataConverter[] metadataConverters,
+				TypeInformation<RowData> producedTypeInfo) {
+			this.valueDeserialization = valueDeserialization;
+			this.metadataAppendingCollector = new MetadataAppendingCollector(metadataConverters);
+			this.producedTypeInfo = producedTypeInfo;
+		}
+
+		@Override
+		public void open(DeserializationSchema.InitializationContext context) throws Exception {
+			valueDeserialization.open(context);
+		}
+
+		@Override
+		public boolean isEndOfStream(RowData nextElement) {
+			return false;
+		}
+
+		@Override
+		public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
+			throw new IllegalStateException("A collector is required for deserializing.");
+		}
+
+		@Override
+		public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
+			metadataAppendingCollector.inputRecord = record;
+			metadataAppendingCollector.outputCollector = collector;
+			valueDeserialization.deserialize(record.value(), metadataAppendingCollector);
+		}
+
+		@Override
+		public TypeInformation<RowData> getProducedType() {
+			return producedTypeInfo;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+
+		private final MetadataConverter[] metadataConverters;
+
+		private transient ConsumerRecord<?, ?> inputRecord;
+
+		private transient Collector<RowData> outputCollector;
+
+		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+			this.metadataConverters = metadataConverters;
+		}
+
+		@Override
+		public void collect(RowData physicalRow) {
+			final int metadataArity = metadataConverters.length;
+			// shortcut if no metadata is required
+			if (metadataArity == 0) {

Review comment:
       We can have a `hasMetadata` final member variable to allow JIT compiler optimization.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
##########
@@ -18,20 +18,439 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
+import org.apache.flink.streaming.connectors.kafka.KafkaTestBaseWithFlink;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
- * IT cases for Kafka for Table API & SQL.
+ * Basic IT cases for the Kafka table source and sink.
  */
-public class KafkaTableITCase extends KafkaTableTestBase {
+@RunWith(Parameterized.class)
+public class KafkaTableITCase extends KafkaTestBaseWithFlink {
+
+	private static final String JSON_FORMAT = "json";
+	private static final String AVRO_FORMAT = "avro";
+	private static final String CSV_FORMAT = "csv";
+
+	@Parameterized.Parameter
+	public boolean isLegacyConnector;
+
+	@Parameterized.Parameter(1)
+	public String format;
+
+	@Parameterized.Parameters(name = "legacy = {0}, format = {1}")
+	public static Object[] parameters() {
+		return new Object[][]{
+			// cover all 3 formats for new and old connector
+			new Object[]{false, JSON_FORMAT},
+			new Object[]{false, AVRO_FORMAT},
+			new Object[]{false, CSV_FORMAT},
+			new Object[]{true, JSON_FORMAT},
+			new Object[]{true, AVRO_FORMAT},
+			new Object[]{true, CSV_FORMAT}
+		};
+	}
+
+	protected StreamExecutionEnvironment env;
+	protected StreamTableEnvironment tEnv;
+
+	@Before
+	public void setup() {
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+		tEnv = StreamTableEnvironment.create(
+			env,
+			EnvironmentSettings.newInstance()
+				// Watermark is only supported in blink planner
+				.useBlinkPlanner()
+				.inStreamingMode()
+				.build()
+		);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		// we have to use single parallelism,
+		// because we will count the messages in sink to terminate the job
+		env.setParallelism(1);
+	}
+
+	@Test
+	public void testKafkaSourceSink() throws Exception {
+		// we always use a different topic name for each parameterized topic,
+		// in order to make sure the topic can be created.
+		final String topic = "tstopic_" + format + "_" + isLegacyConnector;
+		createTestTopic(topic, 1, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+		String groupId = standardProps.getProperty("group.id");
+		String bootstraps = standardProps.getProperty("bootstrap.servers");
+
+		final String createTable;
+		if (!isLegacyConnector) {
+			createTable = String.format(
+				"create table kafka (\n" +
+					"  `computed-price` as price + 1.0,\n" +
+					"  price decimal(38, 18),\n" +
+					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
+					"  log_ts timestamp(3),\n" +
+					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
+					"  watermark for ts as ts\n" +
+					") with (\n" +
+					"  'connector' = '%s',\n" +
+					"  'topic' = '%s',\n" +
+					"  'properties.bootstrap.servers' = '%s',\n" +
+					"  'properties.group.id' = '%s',\n" +
+					"  'scan.startup.mode' = 'earliest-offset',\n" +
+					"  %s\n" +
+					")",
+				KafkaDynamicTableFactory.IDENTIFIER,
+				topic,
+				bootstraps,
+				groupId,
+				formatOptions());
+		} else {
+			createTable = String.format(
+				"create table kafka (\n" +
+					"  `computed-price` as price + 1.0,\n" +
+					"  price decimal(38, 18),\n" +
+					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
+					"  log_ts timestamp(3),\n" +
+					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
+					"  watermark for ts as ts\n" +
+					") with (\n" +
+					"  'connector.type' = 'kafka',\n" +
+					"  'connector.version' = '%s',\n" +
+					"  'connector.topic' = '%s',\n" +
+					"  'connector.properties.bootstrap.servers' = '%s',\n" +
+					"  'connector.properties.group.id' = '%s',\n" +
+					"  'connector.startup-mode' = 'earliest-offset',\n" +
+					"  'update-mode' = 'append',\n" +
+					"  %s\n" +
+					")",
+				KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL,
+				topic,
+				bootstraps,
+				groupId,
+				formatOptions());
+		}
+
+		tEnv.executeSql(createTable);
+
+		String initialValues = "INSERT INTO kafka\n" +
+			"SELECT CAST(price AS DECIMAL(10, 2)), currency, " +
+			" CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\n" +
+			"FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-12-12 00:00:01.001001'), \n" +
+			"  (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 00:00:02.002001'), \n" +
+			"  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 00:00:03.004001'), \n" +
+			"  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 00:00:04.005001'), \n" +
+			"  (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 00:00:05.006001'), \n" +
+			"  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 00:00:10'))\n" +
+			"  AS orders (price, currency, d, t, ts)";
+		tEnv.executeSql(initialValues).await();
+
+		// ---------- Consume stream from Kafka -------------------
+
+		String query = "SELECT\n" +
+			"  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n" +
+			"  CAST(MAX(log_date) AS VARCHAR),\n" +
+			"  CAST(MAX(log_time) AS VARCHAR),\n" +
+			"  CAST(MAX(ts) AS VARCHAR),\n" +
+			"  COUNT(*),\n" +
+			"  CAST(MAX(price) AS DECIMAL(10, 2))\n" +
+			"FROM kafka\n" +
+			"GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
+
+		DataStream<RowData> result = tEnv.toAppendStream(tEnv.sqlQuery(query), RowData.class);
+		TestingSinkFunction sink = new TestingSinkFunction(2);
+		result.addSink(sink).setParallelism(1);
+
+		try {
+			env.execute("Job_2");
+		} catch (Throwable e) {
+			// we have to use a specific exception to indicate the job is finished,
+			// because the registered Kafka source is infinite.
+			if (!isCausedByJobFinished(e)) {
+				// re-throw
+				throw e;
+			}
+		}
+
+		List<String> expected = Arrays.asList(
+			"+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)",
+			"+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)");
+
+		assertEquals(expected, TestingSinkFunction.rows);
+
+		// ------------- cleanup -------------------
+
+		deleteTestTopic(topic);
+	}
+
+	@Test
+	public void testKafkaTableWithMultipleTopics() throws Exception {
+		if (isLegacyConnector) {
+			return;
+		}
+		// ---------- create source and sink tables -------------------
+		String tableTemp = "create table %s (\n" +
+			"  currency string\n" +
+			") with (\n" +
+			"  'connector' = '%s',\n" +
+			"  'topic' = '%s',\n" +
+			"  'properties.bootstrap.servers' = '%s',\n" +
+			"  'properties.group.id' = '%s',\n" +
+			"  'scan.startup.mode' = 'earliest-offset',\n" +
+			"  %s\n" +
+			")";
+		String groupId = standardProps.getProperty("group.id");
+		String bootstraps = standardProps.getProperty("bootstrap.servers");
+		List<String> currencies = Arrays.asList("Euro", "Dollar", "Yen", "Dummy");
+		List<String> topics = currencies.stream()
+			.map(currency -> String.format("%s_%s", currency, format))
+			.collect(Collectors.toList());
+		// Because kafka connector currently doesn't support write data into multiple topic together,
+		// we have to create multiple sink tables.
+		IntStream.range(0, 4).forEach(index -> {
+			createTestTopic(topics.get(index), 1, 1);
+			tEnv.executeSql(String.format(
+				tableTemp,
+				currencies.get(index).toLowerCase(),
+				KafkaDynamicTableFactory.IDENTIFIER,
+				topics.get(index),
+				bootstraps,
+				groupId,
+				formatOptions()
+			));
+		});
+		// create source table
+		tEnv.executeSql(
+			String.format(
+				tableTemp,
+				"currencies_topic_list",
+				KafkaDynamicTableFactory.IDENTIFIER,
+				String.join(";", topics),
+				bootstraps,
+				groupId,
+				formatOptions()));
+
+		// ---------- Prepare data in Kafka topics -------------------
+		String insertTemp = "INSERT INTO %s\n" +
+			"SELECT currency\n" +
+			" FROM (VALUES ('%s'))\n" +
+			" AS orders (currency)";
+		currencies.forEach(
+			currency -> {
+				try {
+					tEnv.executeSql(String.format(insertTemp, currency.toLowerCase(), currency)).await();
+				} catch (Exception e) {
+					fail(e.getMessage());
+				}
+			});
 
-	@Override
-	public String factoryIdentifier() {
-		return KafkaDynamicTableFactory.IDENTIFIER;
+		// ------------- test the topic-list kafka source -------------------
+		DataStream<RowData> result = tEnv.toAppendStream(tEnv.sqlQuery("SELECT currency FROM currencies_topic_list"), RowData.class);
+		TestingSinkFunction sink = new TestingSinkFunction(4); // expect to receive 4 records
+		result.addSink(sink);
+
+		try {
+			env.execute("Job_3");
+		} catch (Throwable e) {
+			// we have to use a specific exception to indicate the job is finished,
+			// because the registered Kafka source is infinite.
+			if (!isCausedByJobFinished(e)) {
+				// re-throw
+				throw e;
+			}
+		}
+		List<String> expected = Arrays.asList(
+			"+I(Dollar)",
+			"+I(Dummy)",
+			"+I(Euro)",
+			"+I(Yen)");
+		TestingSinkFunction.rows.sort(Comparator.naturalOrder());
+		assertEquals(expected, TestingSinkFunction.rows);
+
+		// ------------- cleanup -------------------
+		topics.forEach(KafkaTestBase::deleteTestTopic);
 	}
 
-	@Override
-	public String kafkaVersion() {
-		return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
+	@Test
+	public void testKafkaSourceSinkWithMetadata() throws Exception {
+		if (isLegacyConnector) {
+			return;
+		}
+		// we always use a different topic name for each parameterized topic,
+		// in order to make sure the topic can be created.
+		final String topic = "metadata_topic_" + format;
+		createTestTopic(topic, 1, 1);
+
+		// ---------- Produce an event time stream into Kafka -------------------
+		String groupId = standardProps.getProperty("group.id");
+		String bootstraps = standardProps.getProperty("bootstrap.servers");
+
+		final String createTable = String.format(
+				"CREATE TABLE kafka (\n"
+						+ "  `physical_1` STRING,\n"
+						+ "  `physical_2` INT,\n"
+						// metadata fields are out of order on purpose
+						+ "  `timestamp-type` STRING METADATA VIRTUAL,\n"
+						+ "  `timestamp` TIMESTAMP(3) METADATA,\n"
+						+ "  `offset` BIGINT METADATA VIRTUAL,\n"
+						+ "  `leader-epoch` INT METADATA VIRTUAL,\n"
+						+ "  `headers` MAP<STRING, BYTES> METADATA,\n"
+						+ "  `partition` INT METADATA VIRTUAL,\n"
+						+ "  `topic` STRING METADATA VIRTUAL,\n"
+						+ "  `physical_3` BOOLEAN\n"
+						+ ") WITH (\n"
+						+ "  'connector' = 'kafka',\n"
+						+ "  'topic' = '%s',\n"
+						+ "  'properties.bootstrap.servers' = '%s',\n"
+						+ "  'properties.group.id' = '%s',\n"
+						+ "  'scan.startup.mode' = 'earliest-offset',\n"
+						+ "  %s\n"
+						+ ")",
+				topic,
+				bootstraps,
+				groupId,
+				formatOptions());
+
+		tEnv.executeSql(createTable);
+
+		String initialValues = "INSERT INTO kafka\n"
+				+ "VALUES\n"
+				+ " ('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k1', X'C0FFEE', 'k2', X'BABE'], TRUE),\n"
+				+ " ('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', CAST(NULL AS MAP<STRING, BYTES>), FALSE),\n"
+				+ " ('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k1', X'10', 'k2', X'20'], TRUE)";
+		tEnv.executeSql(initialValues).await();
+
+		// ---------- Consume stream from Kafka -------------------
+
+		final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM kafka"), 3);
+
+		final Map<String, byte[]> headers1 = new HashMap<>();
+		headers1.put("k1", new byte[]{(byte) 0xC0, (byte) 0xFF, (byte) 0xEE});
+		headers1.put("k2", new byte[]{(byte) 0xBA, (byte) 0xBE});
+
+		final Map<String, byte[]> headers3 = new HashMap<>();
+		headers3.put("k1", new byte[]{(byte) 0x10});
+		headers3.put("k2", new byte[]{(byte) 0x20});
+
+		final List<Row> expected = Arrays.asList(
+				Row.of("data 1", 1, "CreateTime", LocalDateTime.parse("2020-03-08T13:12:11.123"), 0L, 0, headers1, 0, topic, true),
+				Row.of("data 2", 2, "CreateTime", LocalDateTime.parse("2020-03-09T13:12:11.123"), 1L, 0, Collections.emptyMap(), 0, topic, false),
+				Row.of("data 3", 3, "CreateTime", LocalDateTime.parse("2020-03-10T13:12:11.123"), 2L, 0, headers3, 0, topic, true)
+		);
+
+		assertTrue(Row.deepEquals(expected, result));

Review comment:
       What about add exception message for the assertion to print the expected and actual rows. That would be helpful for debugging Azure builds. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510138088



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
##########
@@ -140,7 +140,7 @@
 		 *
 		 * @see TableSchema#toPhysicalRowDataType()
 		 */
-		TypeInformation<?> createTypeInformation(DataType consumedDataType);
+		<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);

Review comment:
       No it is not a compatible change. But given that those interfaces are still relatively new and not many people have changed to the new sources/sinks. We should do this change now or never and avoid `@SuppressWarning` in almost all implementations.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510230194



##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       The comment says "For example, it does not support comparing arrays stored in the values of a map", however, the tests prove that we have supported this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f19217861e90c324e6c02a1fde7b705381b72d8 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510368580



##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       I was referring `Row#equals(Object)`. And this doesn't support arrays in values of maps.

##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       I was referring to `Row#equals(Object)`. And this doesn't support arrays in values of maps.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510556244



##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}

Review comment:
       Got.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510380217



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       But even for `key.` and `value.`, users would need to use backticks or the FROM clause. I would stick to the naming convention to not cause confusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510140104



##########
File path: flink-core/src/main/java/org/apache/flink/types/Row.java
##########
@@ -274,4 +278,119 @@ public static Row join(Row first, Row... remainings) {
 
 		return joinedRow;
 	}
+
+	/**
+	 * Compares two {@link Row}s for deep equality. This method supports all conversion classes of the
+	 * table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
+	 * with this implementation in future versions.
+	 */
+	public static boolean deepEquals(Row row, Object other) {
+		return deepEqualsInternal(row, other);
+	}
+
+	/**
+	 * Compares two {@link List}s of {@link Row} for deep equality. This method supports all conversion
+	 * classes of the table ecosystem.
+	 *
+	 * <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
+	 * nested row structures that might be created in the table ecosystem. For example, it does not
+	 * support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
+	 * with this implementation in future versions.
+	 */
+	public static boolean deepEquals(List<Row> l1, List<Row> l2) {
+		return deepEqualsInternal(l1, l2);
+	}
+
+	private static boolean deepEqualsInternal(Object o1, Object o2) {
+		if (o1 == o2) {
+			return true;
+		} else if (o1 == null || o2 == null) {
+			return false;
+		} else if (o1 instanceof Row && o2 instanceof Row) {
+			return deepEqualsRow((Row) o1, (Row) o2);
+		} else if (o1 instanceof Object[] && o2 instanceof Object[]) {
+			return deepEqualsArray((Object[]) o1, (Object[]) o2);
+		} else if (o1 instanceof Map && o2 instanceof Map) {
+			return deepEqualsMap((Map<?, ?>) o1, (Map<?, ?>) o2);
+		} else if (o1 instanceof List && o2 instanceof List) {
+			return deepEqualsList((List<?>) o1, (List<?>) o2);
+		}
+		return Objects.deepEquals(o1, o2);
+	}
+
+	private static boolean deepEqualsRow(Row row1, Row row2) {
+		if (row1.getKind() != row2.getKind()) {
+			return false;
+		}
+		if (row1.getArity() != row2.getArity()) {
+			return false;
+		}
+		for (int pos = 0; pos < row1.getArity(); pos++) {
+			final Object f1 = row1.getField(pos);
+			final Object f2 = row2.getField(pos);
+			if (!deepEqualsInternal(f1, f2)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static boolean deepEqualsArray(Object[] a1, Object[] a2) {
+		if (a1.getClass() != a2.getClass()) {
+			return false;
+		}
+		if (a1.length != a2.length) {
+			return false;
+		}
+		for (int pos = 0; pos < a1.length; pos++) {
+			final Object e1 = a1[pos];
+			final Object e2 = a2[pos];
+			if (!deepEqualsInternal(e1, e2)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static <K, V> boolean deepEqualsMap(Map<K, V> m1, Map<?, ?> m2) {
+		// copied from HashMap.equals but with deepEquals comparision
+		if (m1.size() != m2.size()) {
+			return false;
+		}
+		try {
+			for (Map.Entry<K, V> e : m1.entrySet()) {
+				K key = e.getKey();
+				V value = e.getValue();
+				if (value == null) {
+					if (!(m2.get(key) == null && m2.containsKey(key))) {
+						return false;
+					}
+				} else {
+					if (!deepEqualsInternal(value, m2.get(key))) {
+						return false;
+					}
+				}
+			}
+		} catch (ClassCastException | NullPointerException unused) {
+			return false;
+		}
+		return true;
+	}
+
+	private static <E> boolean deepEqualsList(List<E> l1, List<?> l2) {
+		final Iterator<E> i1 = l1.iterator();
+		final Iterator<?> i2 = l2.iterator();
+		while (i1.hasNext() && i2.hasNext()) {

Review comment:
       I copied this implementation from `java.util.AbstractList#equals` but I don't have strong opinion on this. LinkedList are usually uncommon I guess.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510233222



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       Is that possible to remove the prefix of `debezium-json`? The Javadoc also says "In case of duplicate names in format and source keys, format keys shall have higher precedence." 
   
   So far, the metadata keys of format and source are very different. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f19217861e90c324e6c02a1fde7b705381b72d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f19217861e90c324e6c02a1fde7b705381b72d8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040) 
   * d5bc61d26b858cd7ddce10fe875afdca7ca11a9e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8129",
       "triggerID" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d5bc61d26b858cd7ddce10fe875afdca7ca11a9e Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8129) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-714685958


   @wuchong I updated the PR, I hope I addressed most of your comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13732:
URL: https://github.com/apache/flink/pull/13732#issuecomment-713724613


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040",
       "triggerID" : "3f19217861e90c324e6c02a1fde7b705381b72d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8129",
       "triggerID" : "d5bc61d26b858cd7ddce10fe875afdca7ca11a9e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3f19217861e90c324e6c02a1fde7b705381b72d8 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8040) 
   * d5bc61d26b858cd7ddce10fe875afdca7ca11a9e Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8129) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510146400



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       I would suggest to keep it as it is. Users can use backticks and leader epoch is not very frequently used. Furthermore, once we introduce metadata for formats such as `debezium-json.ingestion-timestamp` it would be confusing if the format identifier changes from `debezium-json` to `debezium_json` for metadata.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #13732:
URL: https://github.com/apache/flink/pull/13732


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510377821



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       Yes, we can shorten them. The idea was to design the metadata similar to regular options. So if key and value are defined they would get a `key.` and `value.` prefix. That should be enough. We don't need the `debezium-json`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510380217



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       But even for `key.` and `value.`, users would need to backticks or the FROM clause. I would stick to the naming convention to not cause confusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #13732: [FLINK-19275][connector-kafka] Support reading and writing Kafka meta… …data

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #13732:
URL: https://github.com/apache/flink/pull/13732#discussion_r510141924



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -202,4 +268,169 @@ public int hashCode() {
 
 		return kafkaConsumer;
 	}
+
+	// --------------------------------------------------------------------------------------------
+	// Metadata handling
+	// --------------------------------------------------------------------------------------------
+
+	private enum ReadableMetadata {
+		TOPIC(
+			"topic",
+			DataTypes.STRING().notNull(),
+			record -> StringData.fromString(record.topic())
+		),
+
+		PARTITION(
+			"partition",
+			DataTypes.INT().notNull(),
+			ConsumerRecord::partition
+		),
+
+		HEADERS(
+			"headers",
+			// key and value of the map are nullable to make handling easier in queries
+			DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable()).notNull(),
+			record -> {
+				final Map<StringData, byte[]> map = new HashMap<>();
+				for (Header header : record.headers()) {
+					map.put(StringData.fromString(header.key()), header.value());
+				}
+				return new GenericMapData(map);
+			}
+		),
+
+		LEADER_EPOCH(
+			"leader-epoch",

Review comment:
       True, then I need to change also the recommendations in `SupportsMetadata` interfaces:
   ```
   	 * <p>Metadata key names follow the same pattern as mentioned in {@link Factory}. In case of duplicate
   	 * names in format and source keys, format keys shall have higher precedence.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org