You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/10/24 09:52:00 UTC

[jira] (FLINK-29480) Skip invalid messages when writing

    [ https://issues.apache.org/jira/browse/FLINK-29480 ]


    Martijn Visser deleted comment on FLINK-29480:
    ----------------------------------------

was (Author: martijnvisser):
[~renqs] [~@leonard] Any thoughts on this?

> Skip invalid messages when writing
> ----------------------------------
>
>                 Key: FLINK-29480
>                 URL: https://issues.apache.org/jira/browse/FLINK-29480
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>            Reporter: Salva
>            Priority: Minor
>
> As reported in [1], it seems that it's not possible to skip invalid messages when writing. More specifically, if there is an error serializing messages, there is no option for skipping them and then Flink job enters a crash loop. In particular, the `write` method of the `KafkaWriter` looks like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, ...);
>   currentProducer.send(record, deliveryCallback); // line 200
>   numRecordsSendCounter.inc();
> } {code}
> So, If you make your `serialize` method return `null`, this is what you get at runtime
> {code:java}
> java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)  {code}
> What I propose is to modify the KafkaWriter [2, 3] like this:
> {code:java}
> @Override
> public void write(IN element, Context context) throws IOException {
>   final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, ...);
>   if (record != null) { // skip null records
>     currentProducer.send(record, deliveryCallback);
>     numRecordsSendCounter.inc();
>   }
> } {code}
> In order to at least give a chance of skipping those messages and move on to the next ones.
> Obviously, one could prepend the sink with a flatMap operator for filtering out invalid messages, but
>  # It looks weird that one has to prepend an operator for "making sure" that the serializer will not fail right after. Wouldn't it be simpler to skip the null records directly in order to avoid this pre-check? [4]
>  # It's such a simple change (apparently)
>  # Brings consistency/symmetry with the reading case [4, 5]
> To expand on point 3, by looking at `KafkaDeserializationSchema`:
> {code:java}
> @Override
> T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
>   T deserialized = deserialize(message);
>   if (deserialized != null) { <-- skip invalid messages
>     out.collect(deserialized);
>   }
> }  {code}
> one can simply return `null` in the overriden `deserialize` method in order to skip any message that fails to be deserialized. Similarly, if one uses the `KafkaRecordDeserializationSchema` interface instead:
> {code:java}
> void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) throws IOException {code}
> then it's also possible not to invoke `out.collect(...)` on null records. To me, it looks strange that the same flexibility is not given on the writing case.
> *References*
> [1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]
> [2] [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143] 
> [3] [https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L197] 
> [4] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] 
> [5] [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)