You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yu Yang (Jira)" <ji...@apache.org> on 2020/05/29 08:11:00 UTC
[jira] [Updated] (FLINK-18017) have Kafka connector report metrics
on null records
[ https://issues.apache.org/jira/browse/FLINK-18017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu Yang updated FLINK-18017:
----------------------------
Summary: have Kafka connector report metrics on null records (was: improve Kafka connector to handle record deserialization exception and report related metrics)
> have Kafka connector report metrics on null records
> ----------------------------------------------------
>
> Key: FLINK-18017
> URL: https://issues.apache.org/jira/browse/FLINK-18017
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.9.1
> Reporter: Yu Yang
> Priority: Major
>
> Corrupted messages can get into the message pipeline for various reasons. When a Flink deserializer fails to deserialize the message, and throw an exception due to corrupted message, the flink application will be blocked until we update the deserializer to handle the exception.
>
> Currently messages are deserialized as below in
> flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
> {code:java}
> for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
> final T value = deserializer.deserialize(record);
> if (deserializer.isEndOfStream(value)) {
> // end of stream signaled
> running = false;
> break;
> }
> // emit the actual record. this also updates offset state atomically
> // and deals with timestamps and watermark generation
> emitRecord(value, partition, record.offset(), record);
> }
> {code}
> Flink Kafka connector needs to catch exception from deserialization, and expose related metrics.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)