You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Qingsheng Ren (Jira)" <ji...@apache.org> on 2022/10/12 05:59:00 UTC

[jira] [Assigned] (FLINK-29492) Kafka exactly-once sink causes OutOfMemoryError

     [ https://issues.apache.org/jira/browse/FLINK-29492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Qingsheng Ren reassigned FLINK-29492:
-------------------------------------

    Assignee: Hang Ruan

> Kafka exactly-once sink causes OutOfMemoryError
> -----------------------------------------------
>
>                 Key: FLINK-29492
>                 URL: https://issues.apache.org/jira/browse/FLINK-29492
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.2
>            Reporter: Robert Metzger
>            Assignee: Hang Ruan
>            Priority: Critical
>
> My Kafka exactly-once sinks are periodically failing with a {{OutOfMemoryError: Java heap space}}.
> This looks very similar to FLINK-28250. But I am running 1.15.2, which contains a fix for FLINK-28250.
> Exception:
> {code:java}
> java.io.IOException: Could not perform checkpoint 2281 for operator http_events[3]: Writer (1/1)#1.
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1210)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
> 	at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
> 	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> 	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> 	at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2281 for operator http_events[3]: Writer (1/1)#1. Failure reason: Checkpoint was declined.
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
> 	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
> 	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
> 	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1198)
> 	... 22 more
> Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
> 	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:327)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.getTransactionalProducer(KafkaWriter.java:315)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.snapshotState(KafkaWriter.java:227)
> 	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.snapshotState(StatefulSinkWriterStateHandler.java:124)
> 	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.snapshotState(SinkWriterOperator.java:152)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
> 	... 33 more
> Caused by: org.apache.kafka.common.KafkaException: java.lang.OutOfMemoryError: Java heap space
> 	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
> 	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
> 	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
> 	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
> 	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
> 	... 43 more
> Caused by: java.lang.OutOfMemoryError: Java heap space
> 	at java.base/java.io.BufferedInputStream.fill(Unknown Source)
> 	at java.base/java.io.BufferedInputStream.read1(Unknown Source)
> 	at java.base/java.io.BufferedInputStream.read(Unknown Source)
> 	at java.base/java.io.DataInputStream.read(Unknown Source)
> 	at java.base/java.io.InputStream.readNBytes(Unknown Source)
> 	at java.base/sun.security.util.IOUtils.readExactlyNBytes(Unknown Source)
> 	at java.base/sun.security.provider.JavaKeyStore.engineLoad(Unknown Source)
> 	at java.base/sun.security.util.KeyStoreDelegator.engineLoad(Unknown Source)
> 	at java.base/java.security.KeyStore.load(Unknown Source)
> 	at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.load(DefaultSslEngineFactory.java:374)
> 	at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory$FileBasedStore.<init>(DefaultSslEngineFactory.java:349)
> 	at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createTruststore(DefaultSslEngineFactory.java:322)
> 	at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:168)
> 	at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:138)
> 	at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:95)
> 	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:180)
> 	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
> 	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
> 	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
> 	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:448)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:429)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
> 	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
> 	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:327)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.getTransactionalProducer(KafkaWriter.java:315)
> 	at org.apache.flink.connector.kafka.sink.KafkaWriter.snapshotState(KafkaWriter.java:227)
> 	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.snapshotState(StatefulSinkWriterStateHandler.java:124)
> 	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.snapshotState(SinkWriterOperator.java:152)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
> {code}
> What I'm observing is that affected TaskManagers have a high number of {{kafka-producer-network-thread}} (350+ after some time). It seems that the Kafka exactly-once sink is still leaking memory.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)