You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ning Shi <ni...@gmail.com> on 2018/11/05 00:28:59 UTC
Questions about Savepoints
I have the following questions regarding savepoint recovery.
- In my job, it takes over 30 minutes to take a savepoint of over 100GB
on 3 TMs. Most time spent after the alignment. I assume it was
serialization and uploading to S3. However, when I resume a new job
from the savepoint, it only takes seconds to recover. It seems too
fast to me. I've tried resuming from the savepoint with a different
parallelism. It was also very fast. Is this expected?
- Is there any log messages on the JM or the TMs indicating when a job
or operator restored state from a savepoint? It'll be very helpful to
know if state is restored especially when the
"--allowNonRestoredState" flag is set.
- If a checkpoint was successfully taken after a savepoint, will
resuming a job from the savepoint try to leverage the checkpoint?
- The job uses Kafka as the source, when I resume it from savepoint,
when will the job start consuming from Kafka again? Does it wait until
all operators have finished restoring state or does it start as soon
as the source operator finishes restoring? I assume it waits for all
because that's the only way to guarantee transactionality.
- When cancelling a job with a savepoint, is there anyway to prevent the
job from cancelling if the savepoint fails? Otherwise, it sounds too
dangerous to use this operation.
Thanks,
--
Ning
Re: Questions about Savepoints
Posted by Yun Tang <my...@live.com>.
Hi Ning
You have asked several questions, I'll try to answer some of them:
- In my job, it takes over 30 minutes to take a savepoint of over 100GB
on 3 TMs. Most time spent after the alignment. I assume it was
serialization and uploading to S3. However, when I resume a new job
from the savepoint, it only takes seconds to recover. It seems too
fast to me. I've tried resuming from the savepoint with a different
parallelism. It was also very fast. Is this expected?
The checkpoint alignment would increase the overall duration for checkpoint. And I think the 'seconds to recover' you mean here may not be accurate, when you see the task RUNNING in the web UI, this is just the beginning, task-running does not mean state-backend has been recovered, the rocksDB state-backend would step into recovering after you see logs below:
INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Initializing RocksDB keyed state backend.
- Is there any log messages on the JM or the TMs indicating when a job
or operator restored state from a savepoint? It'll be very helpful to know if state is restored especially when the
"--allowNonRestoredState" flag is set.
If allowNonRestoredState is set, and not all operator states can be mapped to the running job, you could see logs in JM:
Skipped checkpoint state for operator xxx
- If a checkpoint was successfully taken after a savepoint, will
resuming a job from the savepoint try to leverage the checkpoint?
Definitely NO. You could just resume from the checkpoint if you want to leverage that [1]
- The job uses Kafka as the source, when I resume it from savepoint,
when will the job start consuming from Kafka again? Does it wait until
all operators have finished restoring state or does it start as soon
as the source operator finishes restoring? I assume it waits for all
because that's the only way to guarantee transactionality.
As I have said below, task transforms to RUNNING does not mean they have finished restoring state, they just start to restore state actually. The source operator would start to consume data once they have reached run() in SteamTask.java [2], the checkpoint mechanism would guarantee the transactionality.[3]
- When cancelling a job with a savepoint, is there anyway to prevent the
job from cancelling if the savepoint fails? Otherwise, it sounds too
dangerous to use this operation.
You could read the guide, "The job will only be cancelled if the savepoint succeeds." [4]
[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] https://github.com/apache/flink/blob/ddcdfa5b8e89a7fb9bfe065bae376ff8571abf85/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L300
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html#checkpointing
[4] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/cli.html#cancel-with-a-savepoint
Best
Yun Tang
________________________________
From: Ning Shi <ni...@gmail.com>
Sent: Monday, November 5, 2018 8:28
To: user@flink.apache.org
Subject: Questions about Savepoints
I have the following questions regarding savepoint recovery.
- In my job, it takes over 30 minutes to take a savepoint of over 100GB
on 3 TMs. Most time spent after the alignment. I assume it was
serialization and uploading to S3. However, when I resume a new job
from the savepoint, it only takes seconds to recover. It seems too
fast to me. I've tried resuming from the savepoint with a different
parallelism. It was also very fast. Is this expected?
- Is there any log messages on the JM or the TMs indicating when a job
or operator restored state from a savepoint? It'll be very helpful to
know if state is restored especially when the
"--allowNonRestoredState" flag is set.
- If a checkpoint was successfully taken after a savepoint, will
resuming a job from the savepoint try to leverage the checkpoint?
- The job uses Kafka as the source, when I resume it from savepoint,
when will the job start consuming from Kafka again? Does it wait until
all operators have finished restoring state or does it start as soon
as the source operator finishes restoring? I assume it waits for all
because that's the only way to guarantee transactionality.
- When cancelling a job with a savepoint, is there anyway to prevent the
job from cancelling if the savepoint fails? Otherwise, it sounds too
dangerous to use this operation.
Thanks,
--
Ning