You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yu Yang (Jira)" <ji...@apache.org> on 2020/05/29 00:33:00 UTC

[jira] [Created] (FLINK-18017) improve Kafka connector to handle record deserialization exception and report related metrics

Yu Yang created FLINK-18017:
-------------------------------

             Summary: improve Kafka connector to handle record deserialization exception and report related metrics
                 Key: FLINK-18017
                 URL: https://issues.apache.org/jira/browse/FLINK-18017
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.9.1
            Reporter: Yu Yang


Corrupted messages can get into the message pipeline for various reasons.  When a Flink deserializer fails to deserialize the message, and throw an exception due to corrupted message, the flink application will be blocked until we update the deserializer to handle the exception. 

 

Currently messages are deserialized as below in 

flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
{code:java}

for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {

 final T value = deserializer.deserialize(record);

 if (deserializer.isEndOfStream(value)) {
 // end of stream signaled
 running = false;
 break;
 }

 // emit the actual record. this also updates offset state atomically
 // and deals with timestamps and watermark generation
 emitRecord(value, partition, record.offset(), record);
}
  {code}
Flink Kafka connector needs to catch exception from deserialization, and expose related metrics. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)