You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2021/03/11 06:02:23 UTC

Checkpoint fail due to timeout

Hello,
We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?

Thanks,
Alexey



Re: Checkpoint fail due to timeout

Posted by Alexey Trenikhun <ye...@msn.com>.
I also expected improve of checkpointing at the cost of throughput, but in in reality I didn't notice difference neither in checkpointing or throughput. Backlog was purged by Kafka, so can't post thread dump right now, but I doubt that the problem is gone, so will have next chance during next performance run.

Thanks,
Alexey

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Tuesday, March 23, 2021 12:17 AM
To: Alexey Trenikhun <ye...@msn.com>
Cc: ChangZhuo Chen (陳昌倬) <cz...@czchen.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <ye...@msn.com> wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
>     at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
> Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by Roman Khachatryan <ro...@apache.org>.
Unfortunately, the lock can't be changed as it's part of the public
API (though it will be eliminated with the new source API in FLIP-27).

Theoretically, the change you've made should improve checkpointing at
the cost of throughput. Is it what you see?

But the new stack traces seem strange to me as the emission of the
checkpoint barrier doesn't require a buffer. I also don't see that the
source thread holds the checkpoint lock (something like "locked
<0x000000002af646cc> (a java.lang.Object)"). Could you post or attach
the full thread dump?



Regards,
Roman

On Tue, Mar 23, 2021 at 6:30 AM Alexey Trenikhun <ye...@msn.com> wrote:
>
> I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.
>
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
>     ...
>
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
>     at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
>     -  blocked on java.lang.Object@2af646cc
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>     at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Monday, March 22, 2021 1:36 AM
> To: ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
> Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
> Subject: Re: Checkpoint fail due to timeout
>
> Thanks for sharing the thread dump.
>
> It shows that the source thread is indeed back-pressured
> (checkpoint lock is held by a thread which is trying to emit but
> unable to acquire any free buffers).
>
> The lock is per task, so there can be several locks per TM.
>
> @ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
> the same issue (but I can't tell for sure without a full thread dump)
>
>
> Regards,
> Roman
>
> On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
> >
> > On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > > Hi Roman,
> > > I took thread dump:
> > > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> > >     -  blocked on java.lang.Object@5366a0e2
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> > >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> > >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> > >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> > >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> > >
> > > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at sun.misc.Unsafe.park(Native Method)
> > >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> > >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> > >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> > >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> > >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> > >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> > >
> > > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
> >
> > Hi,
> >
> > This call stack is similar to our case as described in [0]. Maybe they
> > are the same issue?
> >
> > [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
> >
> >
> > --
> > ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> > http://czchen.info/
> > Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by Alexey Trenikhun <ye...@msn.com>.
I've changed KafkaFetcher (GKafkaFetcher) to enter/exit synchronized block on each record, it inverted behavior - now Legacy Source thread waits for checkpointLock, while Source requesting memorySegment.

"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199 WAITING on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@6c56e8f1
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
    ...

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=203 BLOCKED on java.lang.Object@2af646cc owned by "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=199
    at com.gim.fsp.util.flink.GKafkaFetcher.emitRecordsWithTimestamps(GKafkaFetcher.java:54)
    -  blocked on java.lang.Object@2af646cc
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)

Thanks,
Alexey
________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Monday, March 22, 2021 1:36 AM
To: ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by Alexey Trenikhun <ye...@msn.com>.
Would it help if checkpoint would be fair lock? It looks strange, downstream produces output, so I assume at some moment buffers become available, but lock can’t be acquired for 3+hours

________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Monday, March 22, 2021 1:36:35 AM
To: ChangZhuo Chen (陳昌倬) <cz...@czchen.org>
Cc: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by Roman Khachatryan <ro...@apache.org>.
Thanks for sharing the thread dump.

It shows that the source thread is indeed back-pressured
(checkpoint lock is held by a thread which is trying to emit but
unable to acquire any free buffers).

The lock is per task, so there can be several locks per TM.

@ChangZhuo Chen (陳昌倬) , in the thread you mentioned it is most likely
the same issue (but I can't tell for sure without a full thread dump)


Regards,
Roman

On Tue, Mar 16, 2021 at 3:00 PM ChangZhuo Chen (陳昌倬) <cz...@czchen.org> wrote:
>
> On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> > Hi Roman,
> > I took thread dump:
> > "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
> >     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> >     -  blocked on java.lang.Object@5366a0e2
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> >     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >
> > "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at sun.misc.Unsafe.park(Native Method)
> >     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
> >     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> >     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> >     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> >     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> >     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
> >     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> >
> > Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.
>
> Hi,
>
> This call stack is similar to our case as described in [0]. Maybe they
> are the same issue?
>
> [0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by "ChangZhuo Chen (陳昌倬)" <cz...@czchen.org>.
On Tue, Mar 16, 2021 at 02:32:54AM +0000, Alexey Trenikhun wrote:
> Hi Roman,
> I took thread dump:
> "Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
>     at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>     -  blocked on java.lang.Object@5366a0e2
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 
> "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at sun.misc.Unsafe.park(Native Method)
>     -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
>     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>     at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
>     at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
>     at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
>     at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)
> 
> Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.

Hi,

This call stack is similar to our case as described in [0]. Maybe they
are the same issue?

[0] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-debug-checkpoint-savepoint-stuck-in-Flink-1-12-2-td42103.html


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B

Re: Checkpoint fail due to timeout

Posted by Alexey Trenikhun <ye...@msn.com>.
Hi Roman,
I took thread dump:
"Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=200 BLOCKED on java.lang.Object@5366a0e2 owned by "Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    -  blocked on java.lang.Object@5366a0e2
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)

"Legacy Source Thread - Source: digital-itx-eastus2 -> Filter (6/6)#0" Id=202 WAITING on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@6915c7ef
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:319)
    at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:291)

Is it checkpoint lock? Is checkpoint lock per task or per TM? I see multiple threads in SynchronizedStreamTaskActionExecutor.runThrowing blocked on different Objects.

Thanks,
Alexey
________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Monday, March 15, 2021 2:16 AM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun <ye...@msn.com>
> Cc: Flink User Mail List <us...@flink.apache.org>
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
> >
> > Thanks,
> > Alexey
> >
> >

Re: Checkpoint fail due to timeout

Posted by Roman Khachatryan <ro...@apache.org>.
Hello Alexey,

Thanks for the details.

It looks like backpressure is indeed the cause of the issue.
You can check that by looking at the (succeeded) checkpoint start
delay in the tasks following the suspected source
(digital-itx-eastus2?).
To be sure, you can take a thread dump (or profile) those sources: the
task thread should be waiting for checkpoint lock; while the legacy
source thread should be holding it and waiting to output data.

One way to deal with this is to use the new Kafka source (based on
FLIP-27) which will hopefully be released in 1.13 (it is an
experimental feature in 1.12).

Regards,
Roman

On Fri, Mar 12, 2021 at 8:43 PM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello Roman,
>
>  history, details and summary stats are attached.
> There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
> Flink version 1.12.1, I also trying 1.12.2 with same results
>
> Thanks,
> Alexey
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Thursday, March 11, 2021 11:49 PM
> To: Alexey Trenikhun <ye...@msn.com>
> Cc: Flink User Mail List <us...@flink.apache.org>
> Subject: Re: Checkpoint fail due to timeout
>
> Hello,
>
> This can be caused by several reasons such as back-pressure, large
> snapshots or bugs.
>
> Could you please share:
> - the stats of the previous (successful) checkpoints
> - back-pressure metrics for sources
> - which Flink version do you use?
>
> Regards,
> Roman
>
>
> On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <ye...@msn.com> wrote:
> >
> > Hello,
> > We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
> >
> > Thanks,
> > Alexey
> >
> >

Re: Checkpoint fail due to timeout

Posted by Alexey Trenikhun <ye...@msn.com>.
Hello Roman,


  *    history, details and summary stats are attached.
  *   There is backpressure on all sources except Source:gca-cfg and Source:heartbeat
  *   Flink version 1.12.1, I also trying 1.12.2 with same results

Thanks,
Alexey
________________________________
From: Roman Khachatryan <ro...@apache.org>
Sent: Thursday, March 11, 2021 11:49 PM
To: Alexey Trenikhun <ye...@msn.com>
Cc: Flink User Mail List <us...@flink.apache.org>
Subject: Re: Checkpoint fail due to timeout

Hello,

This can be caused by several reasons such as back-pressure, large
snapshots or bugs.

Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?

Regards,
Roman


On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello,
> We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
>
> Thanks,
> Alexey
>
>

Re: Checkpoint fail due to timeout

Posted by Roman Khachatryan <ro...@apache.org>.
Hello,

This can be caused by several reasons such as back-pressure, large
snapshots or bugs.

Could you please share:
- the stats of the previous (successful) checkpoints
- back-pressure metrics for sources
- which Flink version do you use?

Regards,
Roman


On Thu, Mar 11, 2021 at 7:03 AM Alexey Trenikhun <ye...@msn.com> wrote:
>
> Hello,
> We are experiencing the problem with checkpoints failing due to timeout (already set to 30 minute, still failing), checkpoints were not too big before they started to fail, around 1.2Gb. Looks like one of sources (Kafka) never acknowledged (see attached screenshot). What could be the reason?
>
> Thanks,
> Alexey
>
>