You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Teng Fei Liao <te...@gmail.com> on 2020/09/29 02:00:46 UTC

CheckpointedFunction initialization during checkpoint

Hey all,

I've been trying to debug a job recovery performance issue and I'm noticing
some interesting events in the timeline that seem unexpected to me. Here's
a brief outline of the first checkpoint following a job restart:

1. All tasks are deployed and transition into the RUNNING state.
2. I see logs for a subset of initializeState calls ("{} - restoring state"
from TwoPhaseCommitSinkFunction)
3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}."
4. I see more "{} - restoring state" logs.
5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {}
ms)."

The 2 questions I have are:
Are the initializations in 4) in the middle of a checkpoint expected? Since
all the tasks transition in 1) I would think that they are initialized
there as well.

Are the initializations in 4) causing the checkpoint to take longer to
complete? During the checkpoint, I do see "{} - checkpoint {} complete,
committing transaction {} from checkpoint {}" logs
(TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which
suggests that the kafka producers in 2) and 4) are contributing to the
checkpoint.

Thanks!

-Teng

Re: CheckpointedFunction initialization during checkpoint

Posted by Yun Tang <my...@live.com>.
Hi Teng

As Aljoscha said, this should not be a problem as some parts are processing records while some are still restoring in the whole graph.

For the 2nd question "Are the initializations in 4) causing the checkpoint to take longer to complete", the answer is yes.
As the stream task turns into RUNNING stats [1] first and then initializeStateAndOpenOperators [2] before processing any input record or barrier.
In other words, if the operator cannot finish the process to restore state, the operator cannot process input checkpoint barriers.


[1] https://github.com/apache/flink/blob/713d02ef5cc5c668ecaef700257c893201080657/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L711
[2] https://github.com/apache/flink/blob/713d02ef5cc5c668ecaef700257c893201080657/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L496

Best
Yun Tang
________________________________
From: Aljoscha Krettek <al...@apache.org>
Sent: Tuesday, September 29, 2020 20:51
To: dev@flink.apache.org <de...@flink.apache.org>
Subject: Re: CheckpointedFunction initialization during checkpoint

Hi Teng,

I think if the system is slowed down enough it can happen that some
parts of the graph are still restoring while others are already taking a
checkpoint. By virtue of how checkpointing works (by sending barriers
along the network connections between tasks) this should not be a
problem, though.

It would be good to check in the logs if for all individual tasks it
holds that "restoring" comes before "checkpointing".

Best,
Aljoscha

On 29.09.20 04:00, Teng Fei Liao wrote:
> Hey all,
>
> I've been trying to debug a job recovery performance issue and I'm noticing
> some interesting events in the timeline that seem unexpected to me. Here's
> a brief outline of the first checkpoint following a job restart:
>
> 1. All tasks are deployed and transition into the RUNNING state.
> 2. I see logs for a subset of initializeState calls ("{} - restoring state"
> from TwoPhaseCommitSinkFunction)
> 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}."
> 4. I see more "{} - restoring state" logs.
> 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {}
> ms)."
>
> The 2 questions I have are:
> Are the initializations in 4) in the middle of a checkpoint expected? Since
> all the tasks transition in 1) I would think that they are initialized
> there as well.
>
> Are the initializations in 4) causing the checkpoint to take longer to
> complete? During the checkpoint, I do see "{} - checkpoint {} complete,
> committing transaction {} from checkpoint {}" logs
> (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which
> suggests that the kafka producers in 2) and 4) are contributing to the
> checkpoint.
>
> Thanks!
>
> -Teng
>


Re: CheckpointedFunction initialization during checkpoint

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Teng,

I think if the system is slowed down enough it can happen that some 
parts of the graph are still restoring while others are already taking a 
checkpoint. By virtue of how checkpointing works (by sending barriers 
along the network connections between tasks) this should not be a 
problem, though.

It would be good to check in the logs if for all individual tasks it 
holds that "restoring" comes before "checkpointing".

Best,
Aljoscha

On 29.09.20 04:00, Teng Fei Liao wrote:
> Hey all,
> 
> I've been trying to debug a job recovery performance issue and I'm noticing
> some interesting events in the timeline that seem unexpected to me. Here's
> a brief outline of the first checkpoint following a job restart:
> 
> 1. All tasks are deployed and transition into the RUNNING state.
> 2. I see logs for a subset of initializeState calls ("{} - restoring state"
> from TwoPhaseCommitSinkFunction)
> 3. A checkpoint gets triggered "Triggering checkpoint {} @ {} for job {}."
> 4. I see more "{} - restoring state" logs.
> 5. Checkpoint completes "Completed checkpoint {} for job {} ({} bytes in {}
> ms)."
> 
> The 2 questions I have are:
> Are the initializations in 4) in the middle of a checkpoint expected? Since
> all the tasks transition in 1) I would think that they are initialized
> there as well.
> 
> Are the initializations in 4) causing the checkpoint to take longer to
> complete? During the checkpoint, I do see "{} - checkpoint {} complete,
> committing transaction {} from checkpoint {}" logs
> (TwoPhaseCommitSinkFunction's notifyCheckpointComplete method) which
> suggests that the kafka producers in 2) and 4) are contributing to the
> checkpoint.
> 
> Thanks!
> 
> -Teng
>