You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dmytro (Jira)" <ji...@apache.org> on 2022/06/08 22:23:00 UTC
[jira] [Created] (FLINK-27963) FlinkRuntimeException in KafkaSink causes a Flink job to hang
Dmytro created FLINK-27963:
------------------------------
Summary: FlinkRuntimeException in KafkaSink causes a Flink job to hang
Key: FLINK-27963
URL: https://issues.apache.org/jira/browse/FLINK-27963
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.4, 1.15.0
Reporter: Dmytro
If FlinkRuntimeException occurs in the [KafkaSink|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#kafka-sink] then the Flink job tries to re-send failed data again and gets into endless loop "exception->send again"
*Code sample which throws the FlinkRuntimeException:*
{code:java}
int numberOfRows = 1;
int rowsPerSecond = 1;
DataStream<String> stream = environment.addSource(
new DataGeneratorSource<>(
RandomGenerator.stringGenerator(1050000), // max.message.bytes=1048588
rowsPerSecond,
(long) numberOfRows),
TypeInformation.of(String.class))
.setParallelism(1)
.name("string-generator");
KafkaSinkBuilder<String> builder = KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(
KafkaRecordSerializationSchema.builder().setTopic("test.output")
.setValueSerializationSchema(new SimpleStringSchema())
.build());
KafkaSink<String> sink = builder.build();
stream.sinkTo(sink).setParallelism(1).name("output-producer"); {code}
*Exception Stack Trace:*
{code:java}
2022-06-02/14:01:45.066/PDT [flink-akka.actor.default-dispatcher-4] INFO output-producer: Writer -> output-producer: Committer (1/1) (a66beca5a05c1c27691f7b94ca6ac025) switched from RUNNING to FAILED on 271b1b90-7d6b-4a34-8116-3de6faa8a9bf @ 127.0.0.1 (dataPort=-1). org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka null with FlinkKafkaInternalProducer{transactionalId='null', inTransaction=false, closed=false} at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:440) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:421) ~[flink-connector-kafka-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-streaming-java-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-runtime-1.15.0.jar:1.15.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-runtime-1.15.0.jar:1.15.0] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292] Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1050088 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration. {code}
**
--
This message was sent by Atlassian Jira
(v8.20.7#820007)