You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Laura Uzcátegui <la...@gmail.com> on 2018/08/30 14:52:39 UTC

CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Hello,

 At work, we are currently standing up a cluster with the following
configuration:


   - Flink version: 1.4.2
   - HA Enabled with Zookeeper
   - State backend : rocksDB
   - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
   - state.backend.rocksdb.checkpointdir:
   hdfs://namenode:9000/flink/checkpoints
   - *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery

We have also a job running with checkpointing enabled and without
externalized checkpoint.

We run this job multiple times a day since it's run from our
integration-test pipeline, and we started noticing the folder
*high-availability.storageDir  *storing the completedCheckpoint files is
increasing constantly the number of files created, which is making us
wonder if there is no cleanup policy for the Filesystem when HA is enabled.

Under what  circumstance would there be an ever increasing number of
completedCheckpoint files on the HA storage dir when there is only a single
job running over and over again ?

Here is a list of what we are seeing accumulating over time and actually
reaching the maximum of files allowed on the Filesystem.

completedCheckpoint00d86c01d8b9
completedCheckpoint00d86e9030a9
completedCheckpoint00d877b74355
completedCheckpoint00d87b3dd9ad
completedCheckpoint00d8815d9afd
completedCheckpoint00d88973195c
completedCheckpoint00d88b4792f2
completedCheckpoint00d890d499dc
completedCheckpoint00d91b00ada2


Cheers,


Laura U.

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Posted by Stephan Ewen <se...@apache.org>.
One final thought: How to you stop the unbounded streaming application?

If you just kill the Yarn/Mesos/K8s cluster, Flink will not know that this
is a shutdown, and interpret it as a failure. Because of that, checkpoints
will remain (in DFS and in ZooKeeper).

On Fri, Aug 31, 2018 at 2:18 PM, vino yang <ya...@gmail.com> wrote:

> Hi Laura:
>
> Perhaps this is possible because the path to the completed checkpoint on
> HDFS does not have a hierarchical relationship to identify which job it
> belongs to, it is just a fixed prefix plus a random string generated name.
> My personal advice:
>
> 1) Verify it with a clean cluster (clean up the metadata that
> Flink/Zookeeper/HDFS might confuse);
> 2) Verify the node and metadata information (/checkpoints/${jobID}/) on
> ZooKeeper;
> 3) Observe whether there is relevant abnormal information in the log;
>
> Thanks, vino.
>
> Laura Uzcátegui <la...@gmail.com> 于2018年8月31日周五 下午3:51写道:
>
>> Hi Stephan and Vino,
>>
>> Thanks for the quick reply and hints.
>>
>> The configuration for the checkpoints that should remain is set to 1.
>>
>> Since this is a unbounded job run and I can't see it finishing, I suspect
>> as we tear down the cluster every time we finish with the integration test
>> being run, the completedCheckpoint doesn't get deleted, next when the
>> integration test runs again it picks up from the latest completedCheckpoint
>> but there is cases where that job doesn't run again therefore the
>> completedCheckpoint gets staled. Is this something that could happen?
>>
>> Is there anyway to check by logging wether the job gets to Global Final
>> State before we tear down the cluster?
>>
>> Cheers,
>>
>> Laura
>>
>> On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <se...@apache.org> wrote:
>>
>>> Hi Laura!
>>>
>>> Vino had good pointers. There really should be no case in which this is
>>> not cleaned up.
>>>
>>> Is this a bounded job that ends? Is it always the last of the bounded
>>> job's checkpoints that remains?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Fri, Aug 31, 2018 at 5:02 AM, vino yang <ya...@gmail.com>
>>> wrote:
>>>
>>>> Hi Laura,
>>>>
>>>> First of all, Flink only keeps one completed checkpoint by default[1].
>>>> You need to confirm whether your configuration is consistent with the
>>>> number of files. If they are consistent, it is for other reasons:
>>>>
>>>> 1) The cleaning of the completed checkpoint is done by JM. You need to
>>>> confirm whether it can access your file.[2]
>>>> 2) JM will asynchronously clean up the metadata of the old completed
>>>> checkpoint on the ZK with a background thread. After the cleanup is
>>>> successful, it will clean the Checkpoint data. If the above reasons are
>>>> excluded, then maybe you provide JM's log to help us confirm whether this
>>>> is the reason. I think it is more appropriate to ping Till.[3]
>>>>
>>>> [1]: https://ci.apache.org/projects/flink/flink-docs-
>>>> release-1.6/dev/stream/state/checkpointing.html#state-
>>>> checkpoints-num-retained
>>>> [2]: https://stackoverflow.com/questions/44928624/apache-
>>>> flink-not-deleting-old-checkpoints
>>>> [3]: https://github.com/apache/flink/blob/master/
>>>> flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/
>>>> ZooKeeperStateHandleStore.java#L437
>>>>
>>>> Thanks, vino.
>>>>
>>>> Laura Uzcátegui <la...@gmail.com> 于2018年8月30日周四 下午10:52写道:
>>>>
>>>>> Hello,
>>>>>
>>>>>  At work, we are currently standing up a cluster with the following
>>>>> configuration:
>>>>>
>>>>>
>>>>>    - Flink version: 1.4.2
>>>>>    - HA Enabled with Zookeeper
>>>>>    - State backend : rocksDB
>>>>>    - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>>>>>    - state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/
>>>>>    checkpoints
>>>>>    - *high-availability.storageDir*: hdfs://namenode:9000/flink/
>>>>>    recovery
>>>>>
>>>>> We have also a job running with checkpointing enabled and without
>>>>> externalized checkpoint.
>>>>>
>>>>> We run this job multiple times a day since it's run from our
>>>>> integration-test pipeline, and we started noticing the folder
>>>>> *high-availability.storageDir  *storing the completedCheckpoint files
>>>>> is increasing constantly the number of files created, which is making us
>>>>> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>>>>>
>>>>> Under what  circumstance would there be an ever increasing number of
>>>>> completedCheckpoint files on the HA storage dir when there is only a single
>>>>> job running over and over again ?
>>>>>
>>>>> Here is a list of what we are seeing accumulating over time and
>>>>> actually reaching the maximum of files allowed on the Filesystem.
>>>>>
>>>>> completedCheckpoint00d86c01d8b9
>>>>> completedCheckpoint00d86e9030a9
>>>>> completedCheckpoint00d877b74355
>>>>> completedCheckpoint00d87b3dd9ad
>>>>> completedCheckpoint00d8815d9afd
>>>>> completedCheckpoint00d88973195c
>>>>> completedCheckpoint00d88b4792f2
>>>>> completedCheckpoint00d890d499dc
>>>>> completedCheckpoint00d91b00ada2
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>>
>>>>> Laura U.
>>>>>
>>>>>
>>>

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Posted by vino yang <ya...@gmail.com>.
Hi Laura:

Perhaps this is possible because the path to the completed checkpoint on
HDFS does not have a hierarchical relationship to identify which job it
belongs to, it is just a fixed prefix plus a random string generated name.
My personal advice:

1) Verify it with a clean cluster (clean up the metadata that
Flink/Zookeeper/HDFS might confuse);
2) Verify the node and metadata information (/checkpoints/${jobID}/) on
ZooKeeper;
3) Observe whether there is relevant abnormal information in the log;

Thanks, vino.

Laura Uzcátegui <la...@gmail.com> 于2018年8月31日周五 下午3:51写道:

> Hi Stephan and Vino,
>
> Thanks for the quick reply and hints.
>
> The configuration for the checkpoints that should remain is set to 1.
>
> Since this is a unbounded job run and I can't see it finishing, I suspect
> as we tear down the cluster every time we finish with the integration test
> being run, the completedCheckpoint doesn't get deleted, next when the
> integration test runs again it picks up from the latest completedCheckpoint
> but there is cases where that job doesn't run again therefore the
> completedCheckpoint gets staled. Is this something that could happen?
>
> Is there anyway to check by logging wether the job gets to Global Final
> State before we tear down the cluster?
>
> Cheers,
>
> Laura
>
> On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <se...@apache.org> wrote:
>
>> Hi Laura!
>>
>> Vino had good pointers. There really should be no case in which this is
>> not cleaned up.
>>
>> Is this a bounded job that ends? Is it always the last of the bounded
>> job's checkpoints that remains?
>>
>> Best,
>> Stephan
>>
>>
>> On Fri, Aug 31, 2018 at 5:02 AM, vino yang <ya...@gmail.com> wrote:
>>
>>> Hi Laura,
>>>
>>> First of all, Flink only keeps one completed checkpoint by default[1].
>>> You need to confirm whether your configuration is consistent with the
>>> number of files. If they are consistent, it is for other reasons:
>>>
>>> 1) The cleaning of the completed checkpoint is done by JM. You need to
>>> confirm whether it can access your file.[2]
>>> 2) JM will asynchronously clean up the metadata of the old completed
>>> checkpoint on the ZK with a background thread. After the cleanup is
>>> successful, it will clean the Checkpoint data. If the above reasons are
>>> excluded, then maybe you provide JM's log to help us confirm whether this
>>> is the reason. I think it is more appropriate to ping Till.[3]
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>>> [2]:
>>> https://stackoverflow.com/questions/44928624/apache-flink-not-deleting-old-checkpoints
>>> [3]:
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java#L437
>>>
>>> Thanks, vino.
>>>
>>> Laura Uzcátegui <la...@gmail.com> 于2018年8月30日周四 下午10:52写道:
>>>
>>>> Hello,
>>>>
>>>>  At work, we are currently standing up a cluster with the following
>>>> configuration:
>>>>
>>>>
>>>>    - Flink version: 1.4.2
>>>>    - HA Enabled with Zookeeper
>>>>    - State backend : rocksDB
>>>>    - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>>>>    - state.backend.rocksdb.checkpointdir:
>>>>    hdfs://namenode:9000/flink/checkpoints
>>>>    - *high-availability.storageDir*:
>>>>    hdfs://namenode:9000/flink/recovery
>>>>
>>>> We have also a job running with checkpointing enabled and without
>>>> externalized checkpoint.
>>>>
>>>> We run this job multiple times a day since it's run from our
>>>> integration-test pipeline, and we started noticing the folder
>>>> *high-availability.storageDir  *storing the completedCheckpoint files
>>>> is increasing constantly the number of files created, which is making us
>>>> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>>>>
>>>> Under what  circumstance would there be an ever increasing number of
>>>> completedCheckpoint files on the HA storage dir when there is only a single
>>>> job running over and over again ?
>>>>
>>>> Here is a list of what we are seeing accumulating over time and
>>>> actually reaching the maximum of files allowed on the Filesystem.
>>>>
>>>> completedCheckpoint00d86c01d8b9
>>>> completedCheckpoint00d86e9030a9
>>>> completedCheckpoint00d877b74355
>>>> completedCheckpoint00d87b3dd9ad
>>>> completedCheckpoint00d8815d9afd
>>>> completedCheckpoint00d88973195c
>>>> completedCheckpoint00d88b4792f2
>>>> completedCheckpoint00d890d499dc
>>>> completedCheckpoint00d91b00ada2
>>>>
>>>>
>>>> Cheers,
>>>>
>>>>
>>>> Laura U.
>>>>
>>>>
>>

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Posted by Laura Uzcátegui <la...@gmail.com>.
Hi Stephan and Vino,

Thanks for the quick reply and hints.

The configuration for the checkpoints that should remain is set to 1.

Since this is a unbounded job run and I can't see it finishing, I suspect
as we tear down the cluster every time we finish with the integration test
being run, the completedCheckpoint doesn't get deleted, next when the
integration test runs again it picks up from the latest completedCheckpoint
but there is cases where that job doesn't run again therefore the
completedCheckpoint gets staled. Is this something that could happen?

Is there anyway to check by logging wether the job gets to Global Final
State before we tear down the cluster?

Cheers,

Laura

On Fri, 31 Aug 2018, 8:37 am Stephan Ewen, <se...@apache.org> wrote:

> Hi Laura!
>
> Vino had good pointers. There really should be no case in which this is
> not cleaned up.
>
> Is this a bounded job that ends? Is it always the last of the bounded
> job's checkpoints that remains?
>
> Best,
> Stephan
>
>
> On Fri, Aug 31, 2018 at 5:02 AM, vino yang <ya...@gmail.com> wrote:
>
>> Hi Laura,
>>
>> First of all, Flink only keeps one completed checkpoint by default[1].
>> You need to confirm whether your configuration is consistent with the
>> number of files. If they are consistent, it is for other reasons:
>>
>> 1) The cleaning of the completed checkpoint is done by JM. You need to
>> confirm whether it can access your file.[2]
>> 2) JM will asynchronously clean up the metadata of the old completed
>> checkpoint on the ZK with a background thread. After the cleanup is
>> successful, it will clean the Checkpoint data. If the above reasons are
>> excluded, then maybe you provide JM's log to help us confirm whether this
>> is the reason. I think it is more appropriate to ping Till.[3]
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
>> [2]:
>> https://stackoverflow.com/questions/44928624/apache-flink-not-deleting-old-checkpoints
>> [3]:
>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java#L437
>>
>> Thanks, vino.
>>
>> Laura Uzcátegui <la...@gmail.com> 于2018年8月30日周四 下午10:52写道:
>>
>>> Hello,
>>>
>>>  At work, we are currently standing up a cluster with the following
>>> configuration:
>>>
>>>
>>>    - Flink version: 1.4.2
>>>    - HA Enabled with Zookeeper
>>>    - State backend : rocksDB
>>>    - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>>>    - state.backend.rocksdb.checkpointdir:
>>>    hdfs://namenode:9000/flink/checkpoints
>>>    - *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery
>>>
>>> We have also a job running with checkpointing enabled and without
>>> externalized checkpoint.
>>>
>>> We run this job multiple times a day since it's run from our
>>> integration-test pipeline, and we started noticing the folder
>>> *high-availability.storageDir  *storing the completedCheckpoint files
>>> is increasing constantly the number of files created, which is making us
>>> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>>>
>>> Under what  circumstance would there be an ever increasing number of
>>> completedCheckpoint files on the HA storage dir when there is only a single
>>> job running over and over again ?
>>>
>>> Here is a list of what we are seeing accumulating over time and actually
>>> reaching the maximum of files allowed on the Filesystem.
>>>
>>> completedCheckpoint00d86c01d8b9
>>> completedCheckpoint00d86e9030a9
>>> completedCheckpoint00d877b74355
>>> completedCheckpoint00d87b3dd9ad
>>> completedCheckpoint00d8815d9afd
>>> completedCheckpoint00d88973195c
>>> completedCheckpoint00d88b4792f2
>>> completedCheckpoint00d890d499dc
>>> completedCheckpoint00d91b00ada2
>>>
>>>
>>> Cheers,
>>>
>>>
>>> Laura U.
>>>
>>>
>

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Posted by Stephan Ewen <se...@apache.org>.
Hi Laura!

Vino had good pointers. There really should be no case in which this is not
cleaned up.

Is this a bounded job that ends? Is it always the last of the bounded job's
checkpoints that remains?

Best,
Stephan


On Fri, Aug 31, 2018 at 5:02 AM, vino yang <ya...@gmail.com> wrote:

> Hi Laura,
>
> First of all, Flink only keeps one completed checkpoint by default[1]. You
> need to confirm whether your configuration is consistent with the number of
> files. If they are consistent, it is for other reasons:
>
> 1) The cleaning of the completed checkpoint is done by JM. You need to
> confirm whether it can access your file.[2]
> 2) JM will asynchronously clean up the metadata of the old completed
> checkpoint on the ZK with a background thread. After the cleanup is
> successful, it will clean the Checkpoint data. If the above reasons are
> excluded, then maybe you provide JM's log to help us confirm whether this
> is the reason. I think it is more appropriate to ping Till.[3]
>
> [1]: https://ci.apache.org/projects/flink/flink-docs-
> release-1.6/dev/stream/state/checkpointing.html#state-
> checkpoints-num-retained
> [2]: https://stackoverflow.com/questions/44928624/apache-
> flink-not-deleting-old-checkpoints
> [3]: https://github.com/apache/flink/blob/master/
> flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/
> ZooKeeperStateHandleStore.java#L437
>
> Thanks, vino.
>
> Laura Uzcátegui <la...@gmail.com> 于2018年8月30日周四 下午10:52写道:
>
>> Hello,
>>
>>  At work, we are currently standing up a cluster with the following
>> configuration:
>>
>>
>>    - Flink version: 1.4.2
>>    - HA Enabled with Zookeeper
>>    - State backend : rocksDB
>>    - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>>    - state.backend.rocksdb.checkpointdir: hdfs://namenode:9000/flink/
>>    checkpoints
>>    - *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery
>>
>> We have also a job running with checkpointing enabled and without
>> externalized checkpoint.
>>
>> We run this job multiple times a day since it's run from our
>> integration-test pipeline, and we started noticing the folder
>> *high-availability.storageDir  *storing the completedCheckpoint files is
>> increasing constantly the number of files created, which is making us
>> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>>
>> Under what  circumstance would there be an ever increasing number of
>> completedCheckpoint files on the HA storage dir when there is only a single
>> job running over and over again ?
>>
>> Here is a list of what we are seeing accumulating over time and actually
>> reaching the maximum of files allowed on the Filesystem.
>>
>> completedCheckpoint00d86c01d8b9
>> completedCheckpoint00d86e9030a9
>> completedCheckpoint00d877b74355
>> completedCheckpoint00d87b3dd9ad
>> completedCheckpoint00d8815d9afd
>> completedCheckpoint00d88973195c
>> completedCheckpoint00d88b4792f2
>> completedCheckpoint00d890d499dc
>> completedCheckpoint00d91b00ada2
>>
>>
>> Cheers,
>>
>>
>> Laura U.
>>
>>

Re: CompletedCheckpoints are getting Stale ( Flink 1.4.2 )

Posted by vino yang <ya...@gmail.com>.
Hi Laura,

First of all, Flink only keeps one completed checkpoint by default[1]. You
need to confirm whether your configuration is consistent with the number of
files. If they are consistent, it is for other reasons:

1) The cleaning of the completed checkpoint is done by JM. You need to
confirm whether it can access your file.[2]
2) JM will asynchronously clean up the metadata of the old completed
checkpoint on the ZK with a background thread. After the cleanup is
successful, it will clean the Checkpoint data. If the above reasons are
excluded, then maybe you provide JM's log to help us confirm whether this
is the reason. I think it is more appropriate to ping Till.[3]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/checkpointing.html#state-checkpoints-num-retained
[2]:
https://stackoverflow.com/questions/44928624/apache-flink-not-deleting-old-checkpoints
[3]:
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java#L437

Thanks, vino.

Laura Uzcátegui <la...@gmail.com> 于2018年8月30日周四 下午10:52写道:

> Hello,
>
>  At work, we are currently standing up a cluster with the following
> configuration:
>
>
>    - Flink version: 1.4.2
>    - HA Enabled with Zookeeper
>    - State backend : rocksDB
>    - state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
>    - state.backend.rocksdb.checkpointdir:
>    hdfs://namenode:9000/flink/checkpoints
>    - *high-availability.storageDir*: hdfs://namenode:9000/flink/recovery
>
> We have also a job running with checkpointing enabled and without
> externalized checkpoint.
>
> We run this job multiple times a day since it's run from our
> integration-test pipeline, and we started noticing the folder
> *high-availability.storageDir  *storing the completedCheckpoint files is
> increasing constantly the number of files created, which is making us
> wonder if there is no cleanup policy for the Filesystem when HA is enabled.
>
> Under what  circumstance would there be an ever increasing number of
> completedCheckpoint files on the HA storage dir when there is only a single
> job running over and over again ?
>
> Here is a list of what we are seeing accumulating over time and actually
> reaching the maximum of files allowed on the Filesystem.
>
> completedCheckpoint00d86c01d8b9
> completedCheckpoint00d86e9030a9
> completedCheckpoint00d877b74355
> completedCheckpoint00d87b3dd9ad
> completedCheckpoint00d8815d9afd
> completedCheckpoint00d88973195c
> completedCheckpoint00d88b4792f2
> completedCheckpoint00d890d499dc
> completedCheckpoint00d91b00ada2
>
>
> Cheers,
>
>
> Laura U.
>
>