You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/06/09 07:05:00 UTC

[jira] [Commented] (FLINK-27963) FlinkRuntimeException in KafkaSink causes a Flink job to hang

    [ https://issues.apache.org/jira/browse/FLINK-27963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17551989#comment-17551989 ] 

Martijn Visser commented on FLINK-27963:
----------------------------------------

I don't think this is a bug on Flink's end. Kafka is specifically returning an exception that the message is larger than the max.request.size. Flink is unaware of why Kafka won't produce it, so it will retry indefinitely. The fix is also not on Flink's end, but on Kafka's end: you'll need to increase the max.request.size. 

> FlinkRuntimeException in KafkaSink causes a Flink job to hang
> -------------------------------------------------------------
>
>                 Key: FLINK-27963
>                 URL: https://issues.apache.org/jira/browse/FLINK-27963
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0, 1.14.4
>            Reporter: Dmytro
>            Priority: Major
>              Labels: FlinkRuntimeException, KafkaSink
>
> If FlinkRuntimeException occurs in the [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] then the Flink job tries to re-send failed data  again and gets into endless loop "exception->send again"
> *Code sample which throws the FlinkRuntimeException:*
> {code:java}
> int numberOfRows = 1;
>     int rowsPerSecond = 1;
>     DataStream<String> stream = environment.addSource(
>                     new DataGeneratorSource<>(
>                             RandomGenerator.stringGenerator(1050000), // max.message.bytes=1048588
>                             rowsPerSecond,
>                             (long) numberOfRows),
>                     TypeInformation.of(String.class))
>             .setParallelism(1)
>             .name("string-generator");
>     KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
>             .setBootstrapServers("localhost:9092")
>             .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>             .setRecordSerializer(
>                     KafkaRecordSerializationSchema.builder().setTopic("test.output")
>                             .setValueSerializationSchema(new SimpleStringSchema())
>                             .build());
>     KafkaSink<String> sink = builder.build();
>     stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
> *Exception Stack Trace:*
> {code:java}
> 2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO output-producer: Writer -> output-producer: Committer (1/1) (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1050088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. {code}
> **



--
This message was sent by Atlassian Jira
(v8.20.7#820007)