You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Salva (Jira)" <ji...@apache.org> on 2022/09/30 06:08:00 UTC

[jira] [Created] (FLINK-29480) Skip invalid messages when writing

Salva created FLINK-29480:
-----------------------------

             Summary: Skip invalid messages when writing
                 Key: FLINK-29480
                 URL: https://issues.apache.org/jira/browse/FLINK-29480
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: Salva


As reported in [1], it seems that it's not possible to skip messages when writing. More specifically, if there is an error serializing messages, there is no option for skipping them and then Flink job enters a crash loop. In particular, the `write` method of the `KafkaWriter` looks like this:

 
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
  final ProducerRecord<byte[], byte[]> record =
      recordSerializer.serialize(element, ...);
  currentProducer.send(record, deliveryCallback); // line 200
  numRecordsSendCounter.inc();
} {code}
So, If you make your `serialize` method return `null`, this is what you get at runtime

 

 
{code:java}
java.lang.NullPointerException at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)  {code}
 

What I propose is to modify the KafkaWriter like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
  final ProducerRecord<byte[], byte[]> record =
  recordSerializer.serialize(element, ...);
  if (record != null) { // skip null records
    currentProducer.send(record, deliveryCallback);
    numRecordsSendCounter.inc();
  }
} {code}
In order to at least give a chance of skipping those messages and move on to the next.

Obviously, one could prepend the sink with a flatMap operator for filtering out invalid messages, but
 # It looks weird that one has to prepend an operator for "making sure" that the serializer will not fail, wouldn't it be simpler to simply let it fail in order to avoid this pre-check which would be basically repeating the same logic? I might be missing something and maybe there is an intrinsic reason why it's assumed that the serializer should not fail in the first place...
 # It's such a simple change (apparently)
 # Brings consistency/symmetry with the reading case [5, 6]

To expand on point 3, by looking at `KafkaDeserializationSchema`:
{code:java}
// To be overridden by the user
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) throws Exception {
  T deserialized = deserialize(message);
  if (deserialized != null) { <-- skip invalid messages
    out.collect(deserialized);
  }
}  {code}
one can simply return `null` in the overriden `deserialize` method in order to skip any message that fails to be deserialized. Similarly, if one uses the `KafkaRecordDeserializationSchema` interface instead:
{code:java}
void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) throws IOException {code}
Then it's possible not to invoke `out.collect(...)` on null records in order to skip them. To me, it looks strange that the same flexibility is not given on the writing case.

 

*References*

[1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]

[2] [https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143] 

[3] [KafkaWriter.java|[https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L196]] 

[5] [https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink] 

[6] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)