You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gagan Agrawal <ag...@gmail.com> on 2018/10/30 10:07:38 UTC

Savepoint failed with error "Checkpoint expired before completing"

Hi,
We have a flink job (flink version 1.6.1) which unions 2 streams to pass
through custom KeyedProcessFunction with RocksDB state store which final
creates another stream into Kafka. Current size of checkpoint is around
~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
running this job on yarn with following parameters

-yn 10  (10 task managers)
-ytm 2048 (2 GB each)
- Operator parallelism is also 10.

While trying to run savepoint on this job, it runs for ~10mins and then
throws following error. Looks like checkpoint default timeout of 10mins is
causing this. What is recommended way to run savepoint for such job? Should
we increase checkpoint default timeout of 10mins? Also currently our state
size is 100GB but it is expected to grow unto 1TB. Is flink good for
usecases with that much of size? Also how much time savepoint is expected
to take with such state size and parallelism on Yarn? Any other
recommendation would be of great help.

org.apache.flink.util.FlinkException: Triggering a savepoint for the job
434398968e635a49329f59a019b41b6f failed.
at
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
at
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
Caused by: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
expired before completing
at
org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: java.lang.Exception:
Checkpoint expired before completing
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Gagan Agrawal <ag...@gmail.com>.
Good to know that Steven. It will be useful feature to have separate time
out configs for both.

Gagan

On Mon, Nov 5, 2018, 10:06 Steven Wu <stevenz3wu@gmail.com wrote:

> FYI, here is the jira to support timeout in savepoint REST api
> https://issues.apache.org/jira/browse/FLINK-10360
>
> On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal <ag...@gmail.com>
> wrote:
>
>> Great, thanks for sharing that info.
>>
>> Gagan
>>
>> On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <my...@live.com> wrote:
>>
>>> Haha, actually externalized checkpoint also support parallelism
>>> changes, you could read my email
>>> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
>>> posted in dev-mail-list.
>>>
>>> Best
>>> Yun Tang
>>> ------------------------------
>>> *From:* Gagan Agrawal <ag...@gmail.com>
>>> *Sent:* Thursday, November 1, 2018 13:38
>>> *To:* myasuka@live.com
>>> *Cc:* happydexutao@gmail.com; user@flink.apache.org
>>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>>> completing"
>>>
>>> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
>>> able to save save points now. In our case we wanted to increase parallelism
>>> so I believe savepoint is the only option as checkpoint doesn't support
>>> code/parallelism changes.
>>>
>>> Gagan
>>>
>>> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <my...@live.com> wrote:
>>>
>>> Hi Gagan
>>>
>>> Savepoint would generally takes more time than usual incremental
>>> checkpoint, you could try to increase checkpoint timeout time [1]
>>>
>>>    env.getCheckpointConfig().setCheckpointTimeout(900000);
>>>
>>> If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>>> Apache Flink 1.6 Documentation: Checkpoints
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
>>> Deployment & Operations; State & Fault Tolerance; Checkpoints;
>>> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
>>> Difference to Savepoints; Resuming from a retained checkpoint
>>> ci.apache.org
>>>
>>> Best
>>> Yun Tang
>>>
>>> ------------------------------
>>> *From:* Gagan Agrawal <ag...@gmail.com>
>>> *Sent:* Wednesday, October 31, 2018 19:03
>>> *To:* happydexutao@gmail.com
>>> *Cc:* user@flink.apache.org
>>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>>> completing"
>>>
>>> Hi Henry,
>>> Thanks for your response. However we don't face this issue during normal
>>> run as we have incremental checkpoints. Only when we try to take savepoint
>>> (which tries to save entire state in one go), we face this problem.
>>>
>>> Gagan
>>>
>>> On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com> wrote:
>>>
>>> Hi Gagan,
>>>         I have met with the error the checkpoint timeout too.
>>>         In my case, it is not due to big checkpoint size,  but due to
>>> slow sink then cause high backpressure to the upper operator. Then the
>>> barrier may take a long time to arrive to sink.
>>>         Please check if it is the case you have met.
>>>
>>> Best
>>> Henry
>>>
>>> > 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
>>> >
>>> > Hi,
>>> > We have a flink job (flink version 1.6.1) which unions 2 streams to
>>> pass through custom KeyedProcessFunction with RocksDB state store which
>>> final creates another stream into Kafka. Current size of checkpoint is
>>> around ~100GB and checkpoints are saved to s3 with 5 mins interval and
>>> incremental checkpoint enabled. Checkpoints mostly finish in less than 1
>>> min. We are running this job on yarn with following parameters
>>> >
>>> > -yn 10  (10 task managers)
>>> > -ytm 2048 (2 GB each)
>>> > - Operator parallelism is also 10.
>>> >
>>> > While trying to run savepoint on this job, it runs for ~10mins and
>>> then throws following error. Looks like checkpoint default timeout of
>>> 10mins is causing this. What is recommended way to run savepoint for such
>>> job? Should we increase checkpoint default timeout of 10mins? Also
>>> currently our state size is 100GB but it is expected to grow unto 1TB. Is
>>> flink good for usecases with that much of size? Also how much time
>>> savepoint is expected to take with such state size and parallelism on Yarn?
>>> Any other recommendation would be of great help.
>>> >
>>> > org.apache.flink.util.FlinkException: Triggering a savepoint for the
>>> job 434398968e635a49329f59a019b41b6f failed.
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>>> >       at java.security.AccessController.doPrivileged(Native Method)
>>> >       at javax.security.auth.Subject.doAs(Subject.java:422)
>>> >       at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>>> >       at
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> >       at
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
>>> > Caused by: java.util.concurrent.CompletionException:
>>> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
>>> expired before completing
>>> >       at
>>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>>> >       at
>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>> >       at
>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>> >       at
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> >       at
>>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>> >       at
>>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>>> >       at
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>>> >       at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> >       at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> >       at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> >       at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> >       at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> >       at java.lang.Thread.run(Thread.java:748)
>>> > Caused by: java.util.concurrent.CompletionException:
>>> java.lang.Exception: Checkpoint expired before completing
>>> >       at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>> >       at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>> >       at
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>>> >       at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>
>>>

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Steven Wu <st...@gmail.com>.
FYI, here is the jira to support timeout in savepoint REST api
https://issues.apache.org/jira/browse/FLINK-10360

On Fri, Nov 2, 2018 at 6:37 PM Gagan Agrawal <ag...@gmail.com> wrote:

> Great, thanks for sharing that info.
>
> Gagan
>
> On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <my...@live.com> wrote:
>
>> Haha, actually externalized checkpoint also support parallelism changes,
>> you could read my email
>> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
>> posted in dev-mail-list.
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Gagan Agrawal <ag...@gmail.com>
>> *Sent:* Thursday, November 1, 2018 13:38
>> *To:* myasuka@live.com
>> *Cc:* happydexutao@gmail.com; user@flink.apache.org
>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>> completing"
>>
>> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
>> able to save save points now. In our case we wanted to increase parallelism
>> so I believe savepoint is the only option as checkpoint doesn't support
>> code/parallelism changes.
>>
>> Gagan
>>
>> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <my...@live.com> wrote:
>>
>> Hi Gagan
>>
>> Savepoint would generally takes more time than usual incremental
>> checkpoint, you could try to increase checkpoint timeout time [1]
>>
>>    env.getCheckpointConfig().setCheckpointTimeout(900000);
>>
>> If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>> Apache Flink 1.6 Documentation: Checkpoints
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
>> Deployment & Operations; State & Fault Tolerance; Checkpoints;
>> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
>> Difference to Savepoints; Resuming from a retained checkpoint
>> ci.apache.org
>>
>> Best
>> Yun Tang
>>
>> ------------------------------
>> *From:* Gagan Agrawal <ag...@gmail.com>
>> *Sent:* Wednesday, October 31, 2018 19:03
>> *To:* happydexutao@gmail.com
>> *Cc:* user@flink.apache.org
>> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
>> completing"
>>
>> Hi Henry,
>> Thanks for your response. However we don't face this issue during normal
>> run as we have incremental checkpoints. Only when we try to take savepoint
>> (which tries to save entire state in one go), we face this problem.
>>
>> Gagan
>>
>> On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com> wrote:
>>
>> Hi Gagan,
>>         I have met with the error the checkpoint timeout too.
>>         In my case, it is not due to big checkpoint size,  but due to
>> slow sink then cause high backpressure to the upper operator. Then the
>> barrier may take a long time to arrive to sink.
>>         Please check if it is the case you have met.
>>
>> Best
>> Henry
>>
>> > 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
>> >
>> > Hi,
>> > We have a flink job (flink version 1.6.1) which unions 2 streams to
>> pass through custom KeyedProcessFunction with RocksDB state store which
>> final creates another stream into Kafka. Current size of checkpoint is
>> around ~100GB and checkpoints are saved to s3 with 5 mins interval and
>> incremental checkpoint enabled. Checkpoints mostly finish in less than 1
>> min. We are running this job on yarn with following parameters
>> >
>> > -yn 10  (10 task managers)
>> > -ytm 2048 (2 GB each)
>> > - Operator parallelism is also 10.
>> >
>> > While trying to run savepoint on this job, it runs for ~10mins and then
>> throws following error. Looks like checkpoint default timeout of 10mins is
>> causing this. What is recommended way to run savepoint for such job? Should
>> we increase checkpoint default timeout of 10mins? Also currently our state
>> size is 100GB but it is expected to grow unto 1TB. Is flink good for
>> usecases with that much of size? Also how much time savepoint is expected
>> to take with such state size and parallelism on Yarn? Any other
>> recommendation would be of great help.
>> >
>> > org.apache.flink.util.FlinkException: Triggering a savepoint for the
>> job 434398968e635a49329f59a019b41b6f failed.
>> >       at
>> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>> >       at java.security.AccessController.doPrivileged(Native Method)
>> >       at javax.security.auth.Subject.doAs(Subject.java:422)
>> >       at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>> >       at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> >       at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
>> > Caused by: java.util.concurrent.CompletionException:
>> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
>> expired before completing
>> >       at
>> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>> >       at
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> >       at
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> >       at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> >       at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> >       at
>> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>> >       at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>> >       at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >       at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> >       at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> >       at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >       at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >       at java.lang.Thread.run(Thread.java:748)
>> > Caused by: java.util.concurrent.CompletionException:
>> java.lang.Exception: Checkpoint expired before completing
>> >       at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> >       at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> >       at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>> >       at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>>

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Gagan Agrawal <ag...@gmail.com>.
Great, thanks for sharing that info.

Gagan

On Thu, Nov 1, 2018 at 1:50 PM Yun Tang <my...@live.com> wrote:

> Haha, actually externalized checkpoint also support parallelism changes,
> you could read my email
> <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html>
> posted in dev-mail-list.
>
> Best
> Yun Tang
> ------------------------------
> *From:* Gagan Agrawal <ag...@gmail.com>
> *Sent:* Thursday, November 1, 2018 13:38
> *To:* myasuka@live.com
> *Cc:* happydexutao@gmail.com; user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
> able to save save points now. In our case we wanted to increase parallelism
> so I believe savepoint is the only option as checkpoint doesn't support
> code/parallelism changes.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <my...@live.com> wrote:
>
> Hi Gagan
>
> Savepoint would generally takes more time than usual incremental
> checkpoint, you could try to increase checkpoint timeout time [1]
>
>    env.getCheckpointConfig().setCheckpointTimeout(900000);
>
> If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Apache Flink 1.6 Documentation: Checkpoints
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
> Deployment & Operations; State & Fault Tolerance; Checkpoints;
> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
> Difference to Savepoints; Resuming from a retained checkpoint
> ci.apache.org
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Gagan Agrawal <ag...@gmail.com>
> *Sent:* Wednesday, October 31, 2018 19:03
> *To:* happydexutao@gmail.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Hi Henry,
> Thanks for your response. However we don't face this issue during normal
> run as we have incremental checkpoints. Only when we try to take savepoint
> (which tries to save entire state in one go), we face this problem.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com> wrote:
>
> Hi Gagan,
>         I have met with the error the checkpoint timeout too.
>         In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
>         Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >       at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >       at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >       at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >       at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >       at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >       at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> expired before completing
> >       at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> >       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> >       at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> >       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.util.concurrent.CompletionException:
> java.lang.Exception: Checkpoint expired before completing
> >       at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >       at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >       at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >       at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Yun Tang <my...@live.com>.
Haha, actually externalized checkpoint also support parallelism changes, you could read my email<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Why-documentation-always-say-checkpoint-does-not-support-Flink-specific-features-like-rescaling-td23982.html> posted in dev-mail-list.

Best
Yun Tang
________________________________
From: Gagan Agrawal <ag...@gmail.com>
Sent: Thursday, November 1, 2018 13:38
To: myasuka@live.com
Cc: happydexutao@gmail.com; user@flink.apache.org
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"

Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are able to save save points now. In our case we wanted to increase parallelism so I believe savepoint is the only option as checkpoint doesn't support code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <my...@live.com>> wrote:
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]

   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Apache Flink 1.6 Documentation: Checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint
ci.apache.org<http://ci.apache.org>


Best
Yun Tang

________________________________
From: Gagan Agrawal <ag...@gmail.com>>
Sent: Wednesday, October 31, 2018 19:03
To: happydexutao@gmail.com<ma...@gmail.com>
Cc: user@flink.apache.org<ma...@flink.apache.org>
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"

Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com>> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com>> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)


Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Gagan Agrawal <ag...@gmail.com>.
Thanks Yun for your inputs. Yes, increasing checkpoint helps and we are
able to save save points now. In our case we wanted to increase parallelism
so I believe savepoint is the only option as checkpoint doesn't support
code/parallelism changes.

Gagan

On Wed, Oct 31, 2018 at 8:46 PM Yun Tang <my...@live.com> wrote:

> Hi Gagan
>
> Savepoint would generally takes more time than usual incremental
> checkpoint, you could try to increase checkpoint timeout time [1]
>
>    env.getCheckpointConfig().setCheckpointTimeout(900000);
>
> If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Apache Flink 1.6 Documentation: Checkpoints
> <https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
> Deployment & Operations; State & Fault Tolerance; Checkpoints;
> Checkpoints. Overview; Retained Checkpoints. Directory Structure;
> Difference to Savepoints; Resuming from a retained checkpoint
> ci.apache.org
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Gagan Agrawal <ag...@gmail.com>
> *Sent:* Wednesday, October 31, 2018 19:03
> *To:* happydexutao@gmail.com
> *Cc:* user@flink.apache.org
> *Subject:* Re: Savepoint failed with error "Checkpoint expired before
> completing"
>
> Hi Henry,
> Thanks for your response. However we don't face this issue during normal
> run as we have incremental checkpoints. Only when we try to take savepoint
> (which tries to save entire state in one go), we face this problem.
>
> Gagan
>
> On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com> wrote:
>
> Hi Gagan,
>         I have met with the error the checkpoint timeout too.
>         In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
>         Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >       at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >       at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >       at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >       at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >       at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >       at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> expired before completing
> >       at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> >       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> >       at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> >       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.util.concurrent.CompletionException:
> java.lang.Exception: Checkpoint expired before completing
> >       at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >       at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >       at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >       at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Yun Tang <my...@live.com>.
Hi Gagan

Savepoint would generally takes more time than usual incremental checkpoint, you could try to increase checkpoint timeout time [1]

   env.getCheckpointConfig().setCheckpointTimeout(900000);

If you just want to resume from previous job without change the state-backend, I think you could also try to resume from a retained checkpoint without trigger savepoint [2].

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
Apache Flink 1.6 Documentation: Checkpoints<https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint>
Deployment & Operations; State & Fault Tolerance; Checkpoints; Checkpoints. Overview; Retained Checkpoints. Directory Structure; Difference to Savepoints; Resuming from a retained checkpoint
ci.apache.org


Best
Yun Tang

________________________________
From: Gagan Agrawal <ag...@gmail.com>
Sent: Wednesday, October 31, 2018 19:03
To: happydexutao@gmail.com
Cc: user@flink.apache.org
Subject: Re: Savepoint failed with error "Checkpoint expired before completing"

Hi Henry,
Thanks for your response. However we don't face this issue during normal run as we have incremental checkpoints. Only when we try to take savepoint (which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com>> wrote:
Hi Gagan,
        I have met with the error the checkpoint timeout too.
        In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
        Please check if it is the case you have met.

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com>> 写道:
>
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
>
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
>
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
>
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
>       at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
>       at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
>       at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
>       at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
>       at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
>       at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>       at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>       at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
>       at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>       at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>       at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
>       at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
>       at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)


Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by Gagan Agrawal <ag...@gmail.com>.
Hi Henry,
Thanks for your response. However we don't face this issue during normal
run as we have incremental checkpoints. Only when we try to take savepoint
(which tries to save entire state in one go), we face this problem.

Gagan

On Wed, Oct 31, 2018 at 11:41 AM 徐涛 <ha...@gmail.com> wrote:

> Hi Gagan,
>         I have met with the error the checkpoint timeout too.
>         In my case, it is not due to big checkpoint size,  but due to slow
> sink then cause high backpressure to the upper operator. Then the barrier
> may take a long time to arrive to sink.
>         Please check if it is the case you have met.
>
> Best
> Henry
>
> > 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
> >
> > Hi,
> > We have a flink job (flink version 1.6.1) which unions 2 streams to pass
> through custom KeyedProcessFunction with RocksDB state store which final
> creates another stream into Kafka. Current size of checkpoint is around
> ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental
> checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are
> running this job on yarn with following parameters
> >
> > -yn 10  (10 task managers)
> > -ytm 2048 (2 GB each)
> > - Operator parallelism is also 10.
> >
> > While trying to run savepoint on this job, it runs for ~10mins and then
> throws following error. Looks like checkpoint default timeout of 10mins is
> causing this. What is recommended way to run savepoint for such job? Should
> we increase checkpoint default timeout of 10mins? Also currently our state
> size is 100GB but it is expected to grow unto 1TB. Is flink good for
> usecases with that much of size? Also how much time savepoint is expected
> to take with such state size and parallelism on Yarn? Any other
> recommendation would be of great help.
> >
> > org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 434398968e635a49329f59a019b41b6f failed.
> >       at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> >       at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> >       at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> >       at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> >       at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> >       at java.security.AccessController.doPrivileged(Native Method)
> >       at javax.security.auth.Subject.doAs(Subject.java:422)
> >       at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> >       at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >       at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> > Caused by: java.util.concurrent.CompletionException:
> java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint
> expired before completing
> >       at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> >       at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >       at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >       at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >       at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >       at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> >       at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> >       at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >       at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >       at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >       at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.util.concurrent.CompletionException:
> java.lang.Exception: Checkpoint expired before completing
> >       at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >       at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >       at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> >       at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
>

Re: Savepoint failed with error "Checkpoint expired before completing"

Posted by 徐涛 <ha...@gmail.com>.
Hi Gagan,
	I have met with the error the checkpoint timeout too.
	In my case, it is not due to big checkpoint size,  but due to slow sink then cause high backpressure to the upper operator. Then the barrier may take a long time to arrive to sink.
	Please check if it is the case you have met. 

Best
Henry

> 在 2018年10月30日,下午6:07,Gagan Agrawal <ag...@gmail.com> 写道:
> 
> Hi,
> We have a flink job (flink version 1.6.1) which unions 2 streams to pass through custom KeyedProcessFunction with RocksDB state store which final creates another stream into Kafka. Current size of checkpoint is around ~100GB and checkpoints are saved to s3 with 5 mins interval and incremental checkpoint enabled. Checkpoints mostly finish in less than 1 min. We are running this job on yarn with following parameters
> 
> -yn 10  (10 task managers)
> -ytm 2048 (2 GB each)
> - Operator parallelism is also 10.
> 
> While trying to run savepoint on this job, it runs for ~10mins and then throws following error. Looks like checkpoint default timeout of 10mins is causing this. What is recommended way to run savepoint for such job? Should we increase checkpoint default timeout of 10mins? Also currently our state size is 100GB but it is expected to grow unto 1TB. Is flink good for usecases with that much of size? Also how much time savepoint is expected to take with such state size and parallelism on Yarn? Any other recommendation would be of great help.
> 
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job 434398968e635a49329f59a019b41b6f failed.
> 	at org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> 	at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> 	at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> 	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> 	at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:422)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> 	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> 	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
> 	at org.apache.flink.runtime.jobmaster.JobMaster.lambda$triggerSavepoint$13(JobMaster.java:955)
> 	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> 	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> 	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> 	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abortExpired(PendingCheckpoint.java:412)
> 	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$triggerCheckpoint$0(CheckpointCoordinator.java:548)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Checkpoint expired before completing
> 	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> 	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> 	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> 	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)