You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by simpleusr <ce...@gmail.com> on 2021/02/03 12:59:28 UTC

Checkpoint problem in 1.12.0

Hi

I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
to be broken in our kafka connector sourced datastream jobs.

Since there is a siginificant version gap and there are many backwards
uncompatible / deprecated changes in flink runtime between versions, I had
to modify our jobs and noticed that checkpoint offsets are not committed to
kafka for source connectors.

To simplfiy the issues I created simple repoducer projects:

https://github.com/simpleusr/flink_problem_1.5.5

https://github.com/simpleusr/flink_problem_1.12.0

It seems that there are major changes in the checkpoint infrastructure.

For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
(please note that sample project contains a small hack in
org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster from
stopping) :

*[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)

[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)

....................

[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)

[2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*

However for 1.12.0 checkpoint cycles stuck at initial checkpoint:

*[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339584496 for job ce255b141393a358db734db2d27ef0ea.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*

As far as I see, checkpoint cycle is stuck at waiting in
org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
coordinatorCheckpointsComplete although coordinatorsToCheckpoint is empty...

 
final CompletableFuture<?> coordinatorCheckpointsComplete =
pendingCheckpointCompletableFuture
					.thenComposeAsync((pendingCheckpoint) ->
						
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
									coordinatorsToCheckpoint, pendingCheckpoint, timer),
							timer);


Simply returning from
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
when there is no coordinatorsToCheckpoint seems to resolve the problem:

*[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)

[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)

[2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*

I have also created an issue for this

https://issues.apache.org/jira/browse/FLINK-21248


Please help me if I am missing something or there is another solution
without code change.

We need to perform the upgrade and modify our jobs as soon as possible (I
hope other breaking changes do not happen) so any help will be appreciated..



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Checkpoint problem in 1.12.0

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reaching out to the Flink community. I will respond on the JIRA
ticket.

Cheers,
Till

On Wed, Feb 3, 2021 at 1:59 PM simpleusr <ce...@gmail.com> wrote:

> Hi
>
> I am trying to upgrade from 1.5.5 to 1.12 and checkpointing mechanism seems
> to be broken in our kafka connector sourced datastream jobs.
>
> Since there is a siginificant version gap and there are many backwards
> uncompatible / deprecated changes in flink runtime between versions, I had
> to modify our jobs and noticed that checkpoint offsets are not committed to
> kafka for source connectors.
>
> To simplfiy the issues I created simple repoducer projects:
>
> https://github.com/simpleusr/flink_problem_1.5.5
>
> https://github.com/simpleusr/flink_problem_1.12.0
>
> It seems that there are major changes in the checkpoint infrastructure.
>
> For 1.5.5 checkpoint cycles works as expected as can be seen from the logs
> (please note that sample project contains a small hack in
> org.apache.flink.runtime.minicluster.MiniCluster which prevents cluster
> from
> stopping) :
>
> *[2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:17,409] INFO Completed checkpoint 2 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 11 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> ....................
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)
>
> [2021-02-03 10:04:27,401] INFO Completed checkpoint 4 for job
> 08eb15132225903b77ee44f5ca6ad2a5 (43764 bytes in 5 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:873)*
>
> However for 1.12.0 checkpoint cycles stuck at initial checkpoint:
>
> *[2021-02-03 10:06:24,504] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339584496 for job ce255b141393a358db734db2d27ef0ea.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)*
>
> As far as I see, checkpoint cycle is stuck at waiting in
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator for
> coordinatorCheckpointsComplete although coordinatorsToCheckpoint is
> empty...
>
>
> final CompletableFuture<?> coordinatorCheckpointsComplete =
> pendingCheckpointCompletableFuture
>
> .thenComposeAsync((pendingCheckpoint) ->
>
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
>
> coordinatorsToCheckpoint, pendingCheckpoint, timer),
>                                                         timer);
>
>
> Simply returning from
>
> OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion
> when there is no coordinatorsToCheckpoint seems to resolve the problem:
>
> *[2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,387] INFO Triggering checkpoint 1 (type=CHECKPOINT) @
> 1612339673380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:53,607] INFO Completed checkpoint 1 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 225 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,380] INFO Triggering checkpoint 2 (type=CHECKPOINT) @
> 1612339678380 for job ffb4a06302f7e60e9325f32340d299b2.
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:710)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)
>
> [2021-02-03 10:07:58,388] INFO Completed checkpoint 2 for job
> ffb4a06302f7e60e9325f32340d299b2 (8324 bytes in 7 ms).
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:1131)*
>
> I have also created an issue for this
>
> https://issues.apache.org/jira/browse/FLINK-21248
>
>
> Please help me if I am missing something or there is another solution
> without code change.
>
> We need to perform the upgrade and modify our jobs as soon as possible (I
> hope other breaking changes do not happen) so any help will be
> appreciated..
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>