You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Szymon Szczypiński <si...@poczta.fm> on 2018/05/16 21:49:43 UTC

Re: DFS problem with removing checkpoint

Hi,
now i know why those files wasn't "remove". They remove but very slow. 
In my case(Flink 1.3) the problem is in line

client.delete().inBackground(backgroundCallback, executor).forPath(path);

where deletion is in background in executor pool where size is equal to 
2. When i have more files/dirs in "high-availability.storageDir" and 
"state.backend.fs.checkpointdir"
then delete operation are longer and longer and queued operation in pool 
are increase. In my case the main problem is that i have 12 job deployed 
on cluster and checkpoint is set for 5 seconds.

I know that i need to increase timeout between checkpoints, i will 
increase to 1 or 5 minutes depends from job businesses logic.

But i still have some question. Where is set size of  executor pool size 
because i was analyzing the flink code and still don't know where the 
size is set. Maybe someone can of users know where pool is created.



Best regards

On 22.04.2018 17:22, Szymon Szczypiński wrote:
> HI,
> the problem was started on 1.3.1. Now I upgraded to Flink 1.3.3.
> I changed my cluster to 1.3.3  because of jira 
> https://issues.apache.org/jira/browse/FLINK-8807.
>
> I will check in debug mode why cluster doesn't remove those files, 
> maybe i will see why.
>
> Best regards
>
> On 22.04.2018 16:59, Stephan Ewen wrote:
>> Hi!
>>
>> Sorry for the late response... In which Flink version are you?
>>
>> I am wondering if this is somewhat related to that specific setup: 
>> Windows DFS filesystem mounted on Linux with CIFS
>>
>>   - For the "completedCheckpoint<SomeID>", the cleanup should happen 
>> in the "ZooKeeperCompletedCheckpointStore" when dropping a checkpoint
>> - For the "state.backend.fs.checkpointdir/JobId/check-<INTEGER>" 
>> directory, it should (in Flink 1.3 and 1.4) be the FileStateHandle 
>> that deletes the parent directory when empty, meaning the last state 
>> chunk to be deleted deletes the parent directory. In Flink 1.5, it is 
>> the disposal call of the CheckpointStorageLocation.
>>
>> Best,
>> Stephan
>>
>> On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <simon_t@poczta.fm 
>> <ma...@poczta.fm>> wrote:
>>
>>     Hi,
>>
>>     in my case both doesn't deleted. In high-availability.storageDir
>>     the number of files of type "completedCheckpoint<SomeID>" are
>>     growing and also dirs in 
>>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>
>>     In my case i have Windows DFS filesystem mounted on linux with
>>     cifs protocol.
>>
>>     Can you give me a hint or description which process is
>>     responsible for removing those files and directories.
>>
>>     Best regards
>>
>>
>>     W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
>>>     Can you clarify which one does not get deleted? The file in the
>>>     "high-availability.storageDir", or the
>>>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?
>>>
>>>     Could you also tell us which file system you use?
>>>
>>>     There is a known issue in some versions of Flink that S3
>>>     "directories" are not deleted. This means that Hadoop's S3
>>>     marker files (the way that Hadoop's s3n and s3a  imitate
>>>     directories in S3) are not deleted. This is fixed in Flink 1.5.
>>>     A workaround for Flink 1.4 is to use the "flink-s3-fs-presto",
>>>     which does not uses these marker files.
>>>
>>>
>>>     On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński
>>>     <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>
>>>         Thank you for your replay.
>>>
>>>         But my problem is not that Flink doesn't remove
>>>         files/directories after incorrect job cancellation. My
>>>         problem is different.
>>>         Precisely when everything is ok and job is in RUNNING state,
>>>         when checkpoint for job is done, then there is created file
>>>         in "high-availability.storageDir" with name
>>>         completedCheckpoint<SomeID> and also is created dir in
>>>         location
>>>         "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>>         After the next checkpoint is completed, previous file and
>>>         dir are deleted, and this is ok, because i always have only
>>>         one checkpoint.
>>>         But in my case when next checkpoint is completed, the
>>>         previous is not deleted and this happens when job is in
>>>         running state.
>>>
>>>         My be you know why those files/dirs are not deleted.
>>>
>>>         Best regards
>>>
>>>         Szymon Szczypiński
>>>
>>>
>>>         On 29.03.2018 11:23, Stephan Ewen wrote:
>>>>         Flink removes these files / directories only if you
>>>>         properly cancel the job. If you kill the processes
>>>>         (stop-cluster.sh) this looks like a failure, and Flink will
>>>>         try to recover. The recovery always starts in ZooKeeper,
>>>>         not in the DFS.
>>>>
>>>>         Best way to prevent this is to
>>>>           - properly cancel jobs, not just kill processes
>>>>           - use separate cluster IDs for the standalone clusters,
>>>>         so that the new cluster knows that it is not supposed to
>>>>         recover the previous jobs and checkpoints
>>>>
>>>>
>>>>
>>>>         On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński
>>>>         <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>>
>>>>             Hi,
>>>>
>>>>             i have problem with Flink in version 1.3.1.
>>>>
>>>>             I have standalone cluster with two JobManagers and four
>>>>             TaskManager, as
>>>>             DFS i use windows high available storage mounted by
>>>>             cifs protocol.
>>>>
>>>>             And sometimes i'm starting having problem that Flink
>>>>             doesn't remove
>>>>             checkpoint dirs for job and completedCheckpoint files from
>>>>             "high-availability.storageDir".
>>>>
>>>>             To bring back cluster to normal working i need to
>>>>             remove all dirs from
>>>>             DFS and start everything from beginning.
>>>>
>>>>
>>>>             Maybe someone of Flink users had the same problem. For
>>>>             now i doesn't
>>>>             have any idea how to bring back cluster to normal work
>>>>             without deleting
>>>>             dirs from DFS.
>>>>
>>>>             I don't want to delete dirs from DFS because than  i
>>>>             need to redeploy
>>>>             all jobs.
>>>>
>>>>
>>>>             Best regards
>>>>
>>>>             Szymon Szczypiński
>>>>
>>>>
>>>
>>>
>>
>>
>