You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ning Shi <ni...@gmail.com> on 2018/11/05 00:28:59 UTC

Questions about Savepoints

I have the following questions regarding savepoint recovery.

- In my job, it takes over 30 minutes to take a savepoint of over 100GB
  on 3 TMs. Most time spent after the alignment. I assume it was
  serialization and uploading to S3. However, when I resume a new job
  from the savepoint, it only takes seconds to recover. It seems too
  fast to me. I've tried resuming from the savepoint with a different
  parallelism. It was also very fast. Is this expected?

- Is there any log messages on the JM or the TMs indicating when a job
  or operator restored state from a savepoint? It'll be very helpful to
  know if state is restored especially when the
  "--allowNonRestoredState" flag is set.

- If a checkpoint was successfully taken after a savepoint, will
  resuming a job from the savepoint try to leverage the checkpoint?

- The job uses Kafka as the source, when I resume it from savepoint,
  when will the job start consuming from Kafka again? Does it wait until
  all operators have finished restoring state or does it start as soon
  as the source operator finishes restoring? I assume it waits for all
  because that's the only way to guarantee transactionality.

- When cancelling a job with a savepoint, is there anyway to prevent the
  job from cancelling if the savepoint fails? Otherwise, it sounds too
  dangerous to use this operation.

Thanks,

--
Ning

Re: Questions about Savepoints

Posted by Yun Tang <my...@live.com>.
Hi Ning

You have asked several questions, I'll try to answer some of them:

- In my job, it takes over 30 minutes to take a savepoint of over 100GB
  on 3 TMs. Most time spent after the alignment. I assume it was
  serialization and uploading to S3. However, when I resume a new job
  from the savepoint, it only takes seconds to recover. It seems too
  fast to me. I've tried resuming from the savepoint with a different
  parallelism. It was also very fast. Is this expected?
The checkpoint alignment would increase the overall duration for checkpoint. And I think the 'seconds to recover' you mean here may not be accurate, when you see the task RUNNING in the web UI, this is just the beginning, task-running does not mean state-backend has been recovered, the rocksDB state-backend would step into recovering after you see logs below:

INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend.


- Is there any log messages on the JM or the TMs indicating when a job
  or operator restored state from a savepoint? It'll be very helpful to know if state is restored especially when the
  "--allowNonRestoredState" flag is set.
If allowNonRestoredState is set, and not all operator states can be mapped to the running job, you could see logs in JM:

Skipped checkpoint state for operator xxx


 - If a checkpoint was successfully taken after a savepoint, will
  resuming a job from the savepoint try to leverage the checkpoint?

Definitely NO. You could just resume from the checkpoint if you want to leverage that [1]


 - The job uses Kafka as the source, when I resume it from savepoint,
  when will the job start consuming from Kafka again? Does it wait until
  all operators have finished restoring state or does it start as soon
  as the source operator finishes restoring? I assume it waits for all
  because that's the only way to guarantee transactionality.
As I have said below, task transforms to RUNNING does not mean they have finished restoring state, they just start to restore state actually. The source operator would start to consume data once they have reached run() in SteamTask.java [2], the checkpoint mechanism would guarantee the transactionality.[3]

- When cancelling a job with a savepoint, is there anyway to prevent the
  job from cancelling if the savepoint fails? Otherwise, it sounds too
  dangerous to use this operation.
You could read the guide, "The job will only be cancelled if the savepoint succeeds." [4]


[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] https://github.com/apache/flink/blob/ddcdfa5b8e89a7fb9bfe065bae376ff8571abf85/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L300
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html#checkpointing
[4] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/cli.html#cancel-with-a-savepoint

Best
Yun Tang
________________________________
From: Ning Shi <ni...@gmail.com>
Sent: Monday, November 5, 2018 8:28
To: user@flink.apache.org
Subject: Questions about Savepoints

I have the following questions regarding savepoint recovery.

- In my job, it takes over 30 minutes to take a savepoint of over 100GB
  on 3 TMs. Most time spent after the alignment. I assume it was
  serialization and uploading to S3. However, when I resume a new job
  from the savepoint, it only takes seconds to recover. It seems too
  fast to me. I've tried resuming from the savepoint with a different
  parallelism. It was also very fast. Is this expected?

- Is there any log messages on the JM or the TMs indicating when a job
  or operator restored state from a savepoint? It'll be very helpful to
  know if state is restored especially when the
  "--allowNonRestoredState" flag is set.

- If a checkpoint was successfully taken after a savepoint, will
  resuming a job from the savepoint try to leverage the checkpoint?

- The job uses Kafka as the source, when I resume it from savepoint,
  when will the job start consuming from Kafka again? Does it wait until
  all operators have finished restoring state or does it start as soon
  as the source operator finishes restoring? I assume it waits for all
  because that's the only way to guarantee transactionality.

- When cancelling a job with a savepoint, is there anyway to prevent the
  job from cancelling if the savepoint fails? Otherwise, it sounds too
  dangerous to use this operation.

Thanks,

--
Ning