You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Szymon Szczypiński <si...@poczta.fm> on 2018/05/16 21:49:43 UTC
Re: DFS problem with removing checkpoint
Hi,
now i know why those files wasn't "remove". They remove but very slow.
In my case(Flink 1.3) the problem is in line
client.delete().inBackground(backgroundCallback, executor).forPath(path);
where deletion is in background in executor pool where size is equal to
2. When i have more files/dirs in "high-availability.storageDir" and
"state.backend.fs.checkpointdir"
then delete operation are longer and longer and queued operation in pool
are increase. In my case the main problem is that i have 12 job deployed
on cluster and checkpoint is set for 5 seconds.
I know that i need to increase timeout between checkpoints, i will
increase to 1 or 5 minutes depends from job businesses logic.
But i still have some question. Where is set size of executor pool size
because i was analyzing the flink code and still don't know where the
size is set. Maybe someone can of users know where pool is created.
Best regards
On 22.04.2018 17:22, Szymon Szczypiński wrote:
> HI,
> the problem was started on 1.3.1. Now I upgraded to Flink 1.3.3.
> I changed my cluster to 1.3.3 because of jira
> https://issues.apache.org/jira/browse/FLINK-8807.
>
> I will check in debug mode why cluster doesn't remove those files,
> maybe i will see why.
>
> Best regards
>
> On 22.04.2018 16:59, Stephan Ewen wrote:
>> Hi!
>>
>> Sorry for the late response... In which Flink version are you?
>>
>> I am wondering if this is somewhat related to that specific setup:
>> Windows DFS filesystem mounted on Linux with CIFS
>>
>> - For the "completedCheckpoint<SomeID>", the cleanup should happen
>> in the "ZooKeeperCompletedCheckpointStore" when dropping a checkpoint
>> - For the "state.backend.fs.checkpointdir/JobId/check-<INTEGER>"
>> directory, it should (in Flink 1.3 and 1.4) be the FileStateHandle
>> that deletes the parent directory when empty, meaning the last state
>> chunk to be deleted deletes the parent directory. In Flink 1.5, it is
>> the disposal call of the CheckpointStorageLocation.
>>
>> Best,
>> Stephan
>>
>> On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <simon_t@poczta.fm
>> <ma...@poczta.fm>> wrote:
>>
>> Hi,
>>
>> in my case both doesn't deleted. In high-availability.storageDir
>> the number of files of type "completedCheckpoint<SomeID>" are
>> growing and also dirs in
>> "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>
>> In my case i have Windows DFS filesystem mounted on linux with
>> cifs protocol.
>>
>> Can you give me a hint or description which process is
>> responsible for removing those files and directories.
>>
>> Best regards
>>
>>
>> W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
>>> Can you clarify which one does not get deleted? The file in the
>>> "high-availability.storageDir", or the
>>> "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?
>>>
>>> Could you also tell us which file system you use?
>>>
>>> There is a known issue in some versions of Flink that S3
>>> "directories" are not deleted. This means that Hadoop's S3
>>> marker files (the way that Hadoop's s3n and s3a imitate
>>> directories in S3) are not deleted. This is fixed in Flink 1.5.
>>> A workaround for Flink 1.4 is to use the "flink-s3-fs-presto",
>>> which does not uses these marker files.
>>>
>>>
>>> On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński
>>> <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>
>>> Thank you for your replay.
>>>
>>> But my problem is not that Flink doesn't remove
>>> files/directories after incorrect job cancellation. My
>>> problem is different.
>>> Precisely when everything is ok and job is in RUNNING state,
>>> when checkpoint for job is done, then there is created file
>>> in "high-availability.storageDir" with name
>>> completedCheckpoint<SomeID> and also is created dir in
>>> location
>>> "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>> After the next checkpoint is completed, previous file and
>>> dir are deleted, and this is ok, because i always have only
>>> one checkpoint.
>>> But in my case when next checkpoint is completed, the
>>> previous is not deleted and this happens when job is in
>>> running state.
>>>
>>> My be you know why those files/dirs are not deleted.
>>>
>>> Best regards
>>>
>>> Szymon Szczypiński
>>>
>>>
>>> On 29.03.2018 11:23, Stephan Ewen wrote:
>>>> Flink removes these files / directories only if you
>>>> properly cancel the job. If you kill the processes
>>>> (stop-cluster.sh) this looks like a failure, and Flink will
>>>> try to recover. The recovery always starts in ZooKeeper,
>>>> not in the DFS.
>>>>
>>>> Best way to prevent this is to
>>>> - properly cancel jobs, not just kill processes
>>>> - use separate cluster IDs for the standalone clusters,
>>>> so that the new cluster knows that it is not supposed to
>>>> recover the previous jobs and checkpoints
>>>>
>>>>
>>>>
>>>> On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński
>>>> <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> i have problem with Flink in version 1.3.1.
>>>>
>>>> I have standalone cluster with two JobManagers and four
>>>> TaskManager, as
>>>> DFS i use windows high available storage mounted by
>>>> cifs protocol.
>>>>
>>>> And sometimes i'm starting having problem that Flink
>>>> doesn't remove
>>>> checkpoint dirs for job and completedCheckpoint files from
>>>> "high-availability.storageDir".
>>>>
>>>> To bring back cluster to normal working i need to
>>>> remove all dirs from
>>>> DFS and start everything from beginning.
>>>>
>>>>
>>>> Maybe someone of Flink users had the same problem. For
>>>> now i doesn't
>>>> have any idea how to bring back cluster to normal work
>>>> without deleting
>>>> dirs from DFS.
>>>>
>>>> I don't want to delete dirs from DFS because than i
>>>> need to redeploy
>>>> all jobs.
>>>>
>>>>
>>>> Best regards
>>>>
>>>> Szymon Szczypiński
>>>>
>>>>
>>>
>>>
>>
>>
>