You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dmytro (Jira)" <ji...@apache.org> on 2022/06/08 22:23:00 UTC

[jira] [Created] (FLINK-27963) FlinkRuntimeException in KafkaSink causes a Flink job to hang

Dmytro created FLINK-27963:
------------------------------

             Summary: FlinkRuntimeException in KafkaSink causes a Flink job to hang
                 Key: FLINK-27963
                 URL: https://issues.apache.org/jira/browse/FLINK-27963
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.14.4, 1.15.0
            Reporter: Dmytro


If FlinkRuntimeException occurs in the [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] then the Flink job tries to re-send failed data  again and gets into endless loop "exception->send again"

*Code sample which throws the FlinkRuntimeException:*

{code:java}
int numberOfRows = 1;
    int rowsPerSecond = 1;

    DataStream<String> stream = environment.addSource(
                    new DataGeneratorSource<>(
                            RandomGenerator.stringGenerator(1050000), // max.message.bytes=1048588
                            rowsPerSecond,
                            (long) numberOfRows),
                    TypeInformation.of(String.class))
            .setParallelism(1)
            .name("string-generator");


    KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder().setTopic("test.output")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build());


    KafkaSink<String> sink = builder.build();

    stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
*Exception Stack Trace:*
{code:java}
2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO output-producer: Writer -> output-producer: Committer (1/1) (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1050088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. {code}
**



--
This message was sent by Atlassian Jira
(v8.20.7#820007)