You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Surana <vi...@moengage.com> on 2023/01/23 06:48:05 UTC

Job gets stuck when using kafka transactions and eventually crashes

My job runs fine when running without kafka transactions. The source and
sink are kafka in my job with a couple of RocksDB based stateful operators
taking 100GB each.

When I enable kafka transactions, things go well initially and we can see
high throughput as well. However, after a few hours, the job seems to get
stuck as it's unable to commit the transaction, due to which it's unable to
consume any more messages as we've enabled exactly once processing with
unaligned checkpoints. The number of hours it takes might vary but it
always happens and eventually the job crashes with this exception:

ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception
in thread 'kafka-producer-network-thread |
producer-TRANSACTION_ID_PREFIX-1-17060':
java.lang.OutOfMemoryError: Direct buffer memory\n\tat
java.nio.Bits.reserveMemory(Bits.java: 175)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
at
org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
152)
at
org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
58)
at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java:
41)
at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:
430)
at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:
637)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
593)
at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:
327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
at java.lang.Thread.run(Thread.java: 829)

What seems to be happening all of a sudden? Any suggestions on how to fix
it?

-- 
Regards,
Vishal

Re: Job gets stuck when using kafka transactions and eventually crashes

Posted by Martijn Visser <ma...@apache.org>.
If data flows normally, watermarks will progress to downstream operators.
If you have no records flowing in a partition of a stream and you don't
have an idleness configured, that partition will hold back the progress of
watermarks. Watermarks are essential for the checkpointing mechanism, which
ultimately is necessary for committing towards Kafka. If this fails, the
Flink internals (e.g. which events need to be committed to Kafka) will
grow, because data is flowing from other partitions that needs to be
committed.

From your info, I can't digest if checkpoints are working normally. I would
expect that this is not the case, because if checkpoints are happening
successfully, the Kafka transactions would also be committed.

Best regards,

Martijn

Op wo 25 jan. 2023 om 20:09 schreef Vishal Surana <vi...@moengage.com>:

> Can you elaborate a bit more? While idleness is not what we’re seeing now,
> it could perhaps be an issue later on. What about a certain partition going
> idle will result in state buildup?
>
> Thanks,
> Vishal
> On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser <ma...@apache.org>,
> wrote:
>
> Hi Vishal,
>
> Could idleness be an issue? I could see that if idleness occurs and the
> Kafka Source not going in an idle state, that more internal state (to
> commit Kafka transactions) can build up over time that ultimately causes an
> out of memory problem. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness
> for more details on this.
>
> Best regards,
>
> Martijn
>
> Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana <vi...@moengage.com>:
>
>> Could it be that link is unable to commit offsets to Kafka? I know that
>> blinks checkpoint mechanism isn’t tied to its ability to commit offset but
>> at the same time, we’ve seen that the job can take hours to commit offsets
>> while checkpoints go through successfully during that period. But with
>> Kafka transactions enabled, the commit of offset is now required to happen.
>>
>> Thanks,
>> Vishal
>> On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vi...@moengage.com>,
>> wrote:
>>
>> My job runs fine when running without kafka transactions. The source and
>> sink are kafka in my job with a couple of RocksDB based stateful operators
>> taking 100GB each.
>>
>> When I enable kafka transactions, things go well initially and we can see
>> high throughput as well. However, after a few hours, the job seems to get
>> stuck as it's unable to commit the transaction, due to which it's unable to
>> consume any more messages as we've enabled exactly once processing with
>> unaligned checkpoints. The number of hours it takes might vary but it
>> always happens and eventually the job crashes with this exception:
>>
>> ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught
>> exception in thread 'kafka-producer-network-thread |
>> producer-TRANSACTION_ID_PREFIX-1-17060':
>> java.lang.OutOfMemoryError: Direct buffer memory\n\tat
>> java.nio.Bits.reserveMemory(Bits.java: 175)
>> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
>> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
>> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
>> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
>> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
>> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
>> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
>> at
>> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
>> 152)
>> at
>> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
>> 58)
>> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java:
>> 41)
>> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:
>> 430)
>> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
>> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:
>> 637)
>> at
>> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
>> 593)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
>> at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 327)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:
>> 242)
>> at java.lang.Thread.run(Thread.java: 829)
>>
>> What seems to be happening all of a sudden? Any suggestions on how to fix
>> it?
>>
>> --
>> Regards,
>> Vishal
>>
>>

Re: Job gets stuck when using kafka transactions and eventually crashes

Posted by Vishal Surana <vi...@moengage.com>.
Can you elaborate a bit more? While idleness is not what we’re seeing now, it could perhaps be an issue later on. What about a certain partition going idle will result in state buildup?

Thanks,
Vishal
On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser <ma...@apache.org>, wrote:
> Hi Vishal,
>
> Could idleness be an issue? I could see that if idleness occurs and the Kafka Source not going in an idle state, that more internal state (to commit Kafka transactions) can build up over time that ultimately causes an out of memory problem. See https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness for more details on this.
>
> Best regards,
>
> Martijn
>
> > Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana <vi...@moengage.com>:
> > > Could it be that link is unable to commit offsets to Kafka? I know that blinks checkpoint mechanism isn’t tied to its ability to commit offset but at the same time, we’ve seen that the job can take hours to commit offsets while checkpoints go through successfully during that period. But with Kafka transactions enabled, the commit of offset is now required to happen.
> > >
> > > Thanks,
> > > Vishal
> > > On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vi...@moengage.com>, wrote:
> > > > My job runs fine when running without kafka transactions. The source and sink are kafka in my job with a couple of RocksDB based stateful operators taking 100GB each.
> > > >
> > > > When I enable kafka transactions, things go well initially and we can see high throughput as well. However, after a few hours, the job seems to get stuck as it's unable to commit the transaction, due to which it's unable to consume any more messages as we've enabled exactly once processing with unaligned checkpoints. The number of hours it takes might vary but it always happens and eventually the job crashes with this exception:
> > > >
> > > > ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception in thread 'kafka-producer-network-thread | producer-TRANSACTION_ID_PREFIX-1-17060':
> > > > java.lang.OutOfMemoryError: Direct buffer memory\n\tat java.nio.Bits.reserveMemory(Bits.java: 175)
> > > > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> > > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> > > > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> > > > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> > > > at org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: 152)
> > > > at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 58)
> > > > at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41)
> > > > at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430)
> > > > at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> > > > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637)
> > > > at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 593)
> > > > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> > > > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 327)
> > > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> > > > at java.lang.Thread.run(Thread.java: 829)
> > > >
> > > > What seems to be happening all of a sudden? Any suggestions on how to fix it?
> > > >
> > > > --
> > > > Regards,
> > > > Vishal

Re: Job gets stuck when using kafka transactions and eventually crashes

Posted by Martijn Visser <ma...@apache.org>.
Hi Vishal,

Could idleness be an issue? I could see that if idleness occurs and the
Kafka Source not going in an idle state, that more internal state (to
commit Kafka transactions) can build up over time that ultimately causes an
out of memory problem. See
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness
for more details on this.

Best regards,

Martijn

Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana <vi...@moengage.com>:

> Could it be that link is unable to commit offsets to Kafka? I know that
> blinks checkpoint mechanism isn’t tied to its ability to commit offset but
> at the same time, we’ve seen that the job can take hours to commit offsets
> while checkpoints go through successfully during that period. But with
> Kafka transactions enabled, the commit of offset is now required to happen.
>
> Thanks,
> Vishal
> On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vi...@moengage.com>,
> wrote:
>
> My job runs fine when running without kafka transactions. The source and
> sink are kafka in my job with a couple of RocksDB based stateful operators
> taking 100GB each.
>
> When I enable kafka transactions, things go well initially and we can see
> high throughput as well. However, after a few hours, the job seems to get
> stuck as it's unable to commit the transaction, due to which it's unable to
> consume any more messages as we've enabled exactly once processing with
> unaligned checkpoints. The number of hours it takes might vary but it
> always happens and eventually the job crashes with this exception:
>
> ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception
> in thread 'kafka-producer-network-thread |
> producer-TRANSACTION_ID_PREFIX-1-17060':
> java.lang.OutOfMemoryError: Direct buffer memory\n\tat
> java.nio.Bits.reserveMemory(Bits.java: 175)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> at
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
> 152)
> at
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
> 58)
> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java:
> 41)
> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:
> 430)
> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:
> 637)
> at
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
> 593)
> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:
> 327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> at java.lang.Thread.run(Thread.java: 829)
>
> What seems to be happening all of a sudden? Any suggestions on how to fix
> it?
>
> --
> Regards,
> Vishal
>
>

Re: Job gets stuck when using kafka transactions and eventually crashes

Posted by Vishal Surana <vi...@moengage.com>.
Could it be that link is unable to commit offsets to Kafka? I know that blinks checkpoint mechanism isn’t tied to its ability to commit offset but at the same time, we’ve seen that the job can take hours to commit offsets while checkpoints go through successfully during that period. But with Kafka transactions enabled, the commit of offset is now required to happen.

Thanks,
Vishal
On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vi...@moengage.com>, wrote:
> My job runs fine when running without kafka transactions. The source and sink are kafka in my job with a couple of RocksDB based stateful operators taking 100GB each.
>
> When I enable kafka transactions, things go well initially and we can see high throughput as well. However, after a few hours, the job seems to get stuck as it's unable to commit the transaction, due to which it's unable to consume any more messages as we've enabled exactly once processing with unaligned checkpoints. The number of hours it takes might vary but it always happens and eventually the job crashes with this exception:
>
> ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception in thread 'kafka-producer-network-thread | producer-TRANSACTION_ID_PREFIX-1-17060':
> java.lang.OutOfMemoryError: Direct buffer memory\n\tat java.nio.Bits.reserveMemory(Bits.java: 175)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> at org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: 152)
> at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 58)
> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41)
> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430)
> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 593)
> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> at java.lang.Thread.run(Thread.java: 829)
>
> What seems to be happening all of a sudden? Any suggestions on how to fix it?
>
> --
> Regards,
> Vishal