You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yu Yang (Jira)" <ji...@apache.org> on 2020/05/29 08:11:00 UTC

[jira] [Updated] (FLINK-18017) have Kafka connector report metrics on null records

     [ https://issues.apache.org/jira/browse/FLINK-18017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yu Yang updated FLINK-18017:
----------------------------
    Summary: have Kafka connector report metrics on null records   (was: improve Kafka connector to handle record deserialization exception and report related metrics)

> have Kafka connector report metrics on null records 
> ----------------------------------------------------
>
>                 Key: FLINK-18017
>                 URL: https://issues.apache.org/jira/browse/FLINK-18017
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.1
>            Reporter: Yu Yang
>            Priority: Major
>
> Corrupted messages can get into the message pipeline for various reasons.  When a Flink deserializer fails to deserialize the message, and throw an exception due to corrupted message, the flink application will be blocked until we update the deserializer to handle the exception. 
>  
> Currently messages are deserialized as below in 
> flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
> {code:java}
> for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
>  final T value = deserializer.deserialize(record);
>  if (deserializer.isEndOfStream(value)) {
>  // end of stream signaled
>  running = false;
>  break;
>  }
>  // emit the actual record. this also updates offset state atomically
>  // and deals with timestamps and watermark generation
>  emitRecord(value, partition, record.offset(), record);
> }
>   {code}
> Flink Kafka connector needs to catch exception from deserialization, and expose related metrics. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)