You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Szymon Szczypiński <si...@poczta.fm> on 2018/03/28 21:22:18 UTC

DFS problem with removing checkpoint

Hi,

i have problem with Flink in version 1.3.1.

I have standalone cluster with two JobManagers and four TaskManager, as
DFS i use windows high available storage mounted by cifs protocol.

And sometimes i'm starting having problem that Flink doesn't remove
checkpoint dirs for job and completedCheckpoint files from
"high-availability.storageDir".

To bring back cluster to normal working i need to remove all dirs from
DFS and start everything from beginning.


Maybe someone of Flink users had the same problem. For now i doesn't
have any idea how to bring back cluster to normal work without deleting
dirs from DFS.

I don't want to delete dirs from DFS because than  i need to redeploy
all jobs.


Best regards

Szymon Szczypiński


Re: DFS problem with removing checkpoint

Posted by Szymon Szczypiński <si...@poczta.fm>.
Hi,
now i know why those files wasn't "remove". They remove but very slow. 
In my case(Flink 1.3) the problem is in line

client.delete().inBackground(backgroundCallback, executor).forPath(path);

where deletion is in background in executor pool where size is equal to 
2. When i have more files/dirs in "high-availability.storageDir" and 
"state.backend.fs.checkpointdir"
then delete operation are longer and longer and queued operation in pool 
are increase. In my case the main problem is that i have 12 job deployed 
on cluster and checkpoint is set for 5 seconds.

I know that i need to increase timeout between checkpoints, i will 
increase to 1 or 5 minutes depends from job businesses logic.

But i still have some question. Where is set size of  executor pool size 
because i was analyzing the flink code and still don't know where the 
size is set. Maybe someone can of users know where pool is created.



Best regards

On 22.04.2018 17:22, Szymon Szczypiński wrote:
> HI,
> the problem was started on 1.3.1. Now I upgraded to Flink 1.3.3.
> I changed my cluster to 1.3.3  because of jira 
> https://issues.apache.org/jira/browse/FLINK-8807.
>
> I will check in debug mode why cluster doesn't remove those files, 
> maybe i will see why.
>
> Best regards
>
> On 22.04.2018 16:59, Stephan Ewen wrote:
>> Hi!
>>
>> Sorry for the late response... In which Flink version are you?
>>
>> I am wondering if this is somewhat related to that specific setup: 
>> Windows DFS filesystem mounted on Linux with CIFS
>>
>>   - For the "completedCheckpoint<SomeID>", the cleanup should happen 
>> in the "ZooKeeperCompletedCheckpointStore" when dropping a checkpoint
>> - For the "state.backend.fs.checkpointdir/JobId/check-<INTEGER>" 
>> directory, it should (in Flink 1.3 and 1.4) be the FileStateHandle 
>> that deletes the parent directory when empty, meaning the last state 
>> chunk to be deleted deletes the parent directory. In Flink 1.5, it is 
>> the disposal call of the CheckpointStorageLocation.
>>
>> Best,
>> Stephan
>>
>> On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <simon_t@poczta.fm 
>> <ma...@poczta.fm>> wrote:
>>
>>     Hi,
>>
>>     in my case both doesn't deleted. In high-availability.storageDir
>>     the number of files of type "completedCheckpoint<SomeID>" are
>>     growing and also dirs in 
>>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>
>>     In my case i have Windows DFS filesystem mounted on linux with
>>     cifs protocol.
>>
>>     Can you give me a hint or description which process is
>>     responsible for removing those files and directories.
>>
>>     Best regards
>>
>>
>>     W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
>>>     Can you clarify which one does not get deleted? The file in the
>>>     "high-availability.storageDir", or the
>>>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?
>>>
>>>     Could you also tell us which file system you use?
>>>
>>>     There is a known issue in some versions of Flink that S3
>>>     "directories" are not deleted. This means that Hadoop's S3
>>>     marker files (the way that Hadoop's s3n and s3a  imitate
>>>     directories in S3) are not deleted. This is fixed in Flink 1.5.
>>>     A workaround for Flink 1.4 is to use the "flink-s3-fs-presto",
>>>     which does not uses these marker files.
>>>
>>>
>>>     On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński
>>>     <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>
>>>         Thank you for your replay.
>>>
>>>         But my problem is not that Flink doesn't remove
>>>         files/directories after incorrect job cancellation. My
>>>         problem is different.
>>>         Precisely when everything is ok and job is in RUNNING state,
>>>         when checkpoint for job is done, then there is created file
>>>         in "high-availability.storageDir" with name
>>>         completedCheckpoint<SomeID> and also is created dir in
>>>         location
>>>         "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>>>         After the next checkpoint is completed, previous file and
>>>         dir are deleted, and this is ok, because i always have only
>>>         one checkpoint.
>>>         But in my case when next checkpoint is completed, the
>>>         previous is not deleted and this happens when job is in
>>>         running state.
>>>
>>>         My be you know why those files/dirs are not deleted.
>>>
>>>         Best regards
>>>
>>>         Szymon Szczypiński
>>>
>>>
>>>         On 29.03.2018 11:23, Stephan Ewen wrote:
>>>>         Flink removes these files / directories only if you
>>>>         properly cancel the job. If you kill the processes
>>>>         (stop-cluster.sh) this looks like a failure, and Flink will
>>>>         try to recover. The recovery always starts in ZooKeeper,
>>>>         not in the DFS.
>>>>
>>>>         Best way to prevent this is to
>>>>           - properly cancel jobs, not just kill processes
>>>>           - use separate cluster IDs for the standalone clusters,
>>>>         so that the new cluster knows that it is not supposed to
>>>>         recover the previous jobs and checkpoints
>>>>
>>>>
>>>>
>>>>         On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński
>>>>         <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>>
>>>>             Hi,
>>>>
>>>>             i have problem with Flink in version 1.3.1.
>>>>
>>>>             I have standalone cluster with two JobManagers and four
>>>>             TaskManager, as
>>>>             DFS i use windows high available storage mounted by
>>>>             cifs protocol.
>>>>
>>>>             And sometimes i'm starting having problem that Flink
>>>>             doesn't remove
>>>>             checkpoint dirs for job and completedCheckpoint files from
>>>>             "high-availability.storageDir".
>>>>
>>>>             To bring back cluster to normal working i need to
>>>>             remove all dirs from
>>>>             DFS and start everything from beginning.
>>>>
>>>>
>>>>             Maybe someone of Flink users had the same problem. For
>>>>             now i doesn't
>>>>             have any idea how to bring back cluster to normal work
>>>>             without deleting
>>>>             dirs from DFS.
>>>>
>>>>             I don't want to delete dirs from DFS because than  i
>>>>             need to redeploy
>>>>             all jobs.
>>>>
>>>>
>>>>             Best regards
>>>>
>>>>             Szymon Szczypiński
>>>>
>>>>
>>>
>>>
>>
>>
>


Re: DFS problem with removing checkpoint

Posted by Szymon Szczypiński <si...@poczta.fm>.
HI,
the problem was started on 1.3.1. Now I upgraded to Flink 1.3.3.
I changed my cluster to 1.3.3  because of jira 
https://issues.apache.org/jira/browse/FLINK-8807.

I will check in debug mode why cluster doesn't remove those files, maybe 
i will see why.

Best regards

On 22.04.2018 16:59, Stephan Ewen wrote:
> Hi!
>
> Sorry for the late response... In which Flink version are you?
>
> I am wondering if this is somewhat related to that specific setup: 
> Windows DFS filesystem mounted on Linux with CIFS
>
>   - For the "completedCheckpoint<SomeID>", the cleanup should happen 
> in the "ZooKeeperCompletedCheckpointStore" when dropping a checkpoint
> - For the "state.backend.fs.checkpointdir/JobId/check-<INTEGER>" 
> directory, it should (in Flink 1.3 and 1.4) be the FileStateHandle 
> that deletes the parent directory when empty, meaning the last state 
> chunk to be deleted deletes the parent directory. In Flink 1.5, it is 
> the disposal call of the CheckpointStorageLocation.
>
> Best,
> Stephan
>
> On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <simon_t@poczta.fm 
> <ma...@poczta.fm>> wrote:
>
>     Hi,
>
>     in my case both doesn't deleted. In high-availability.storageDir
>     the number of files of type "completedCheckpoint<SomeID>" are
>     growing and also dirs in 
>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>
>     In my case i have Windows DFS filesystem mounted on linux with
>     cifs protocol.
>
>     Can you give me a hint or description which process is responsible
>     for removing those files and directories.
>
>     Best regards
>
>
>     W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
>>     Can you clarify which one does not get deleted? The file in the
>>     "high-availability.storageDir", or the
>>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?
>>
>>     Could you also tell us which file system you use?
>>
>>     There is a known issue in some versions of Flink that S3
>>     "directories" are not deleted. This means that Hadoop's S3 marker
>>     files (the way that Hadoop's s3n and s3a imitate directories in
>>     S3) are not deleted. This is fixed in Flink 1.5. A workaround for
>>     Flink 1.4 is to use the "flink-s3-fs-presto", which does not uses
>>     these marker files.
>>
>>
>>     On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński
>>     <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>
>>         Thank you for your replay.
>>
>>         But my problem is not that Flink doesn't remove
>>         files/directories after incorrect job cancellation. My
>>         problem is different.
>>         Precisely when everything is ok and job is in RUNNING state,
>>         when checkpoint for job is done, then there is created file
>>         in "high-availability.storageDir" with name
>>         completedCheckpoint<SomeID> and also is created dir in
>>         location
>>         "state.backend.fs.checkpointdir/JobId/check-<INTEGER>". After
>>         the next checkpoint is completed, previous file and dir are
>>         deleted, and this is ok, because i always have only one
>>         checkpoint.
>>         But in my case when next checkpoint is completed, the
>>         previous is not deleted and this happens when job is in
>>         running state.
>>
>>         My be you know why those files/dirs are not deleted.
>>
>>         Best regards
>>
>>         Szymon Szczypiński
>>
>>
>>         On 29.03.2018 11:23, Stephan Ewen wrote:
>>>         Flink removes these files / directories only if you properly
>>>         cancel the job. If you kill the processes (stop-cluster.sh)
>>>         this looks like a failure, and Flink will try to recover.
>>>         The recovery always starts in ZooKeeper, not in the DFS.
>>>
>>>         Best way to prevent this is to
>>>           - properly cancel jobs, not just kill processes
>>>           - use separate cluster IDs for the standalone clusters, so
>>>         that the new cluster knows that it is not supposed to
>>>         recover the previous jobs and checkpoints
>>>
>>>
>>>
>>>         On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński
>>>         <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>>
>>>             Hi,
>>>
>>>             i have problem with Flink in version 1.3.1.
>>>
>>>             I have standalone cluster with two JobManagers and four
>>>             TaskManager, as
>>>             DFS i use windows high available storage mounted by cifs
>>>             protocol.
>>>
>>>             And sometimes i'm starting having problem that Flink
>>>             doesn't remove
>>>             checkpoint dirs for job and completedCheckpoint files from
>>>             "high-availability.storageDir".
>>>
>>>             To bring back cluster to normal working i need to remove
>>>             all dirs from
>>>             DFS and start everything from beginning.
>>>
>>>
>>>             Maybe someone of Flink users had the same problem. For
>>>             now i doesn't
>>>             have any idea how to bring back cluster to normal work
>>>             without deleting
>>>             dirs from DFS.
>>>
>>>             I don't want to delete dirs from DFS because than  i
>>>             need to redeploy
>>>             all jobs.
>>>
>>>
>>>             Best regards
>>>
>>>             Szymon Szczypiński
>>>
>>>
>>
>>
>
>


Re: DFS problem with removing checkpoint

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Sorry for the late response... In which Flink version are you?

I am wondering if this is somewhat related to that specific setup:   Windows
DFS filesystem mounted on Linux with CIFS

  - For the "completedCheckpoint<SomeID>", the cleanup should happen in the
"ZooKeeperCompletedCheckpointStore" when dropping a checkpoint
  - For the "state.backend.fs.checkpointdir/JobId/check-<INTEGER>"
directory, it should (in Flink 1.3 and 1.4) be the FileStateHandle that
deletes the parent directory when empty, meaning the last state chunk to be
deleted deletes the parent directory. In Flink 1.5, it is the disposal call
of the CheckpointStorageLocation.

Best,
Stephan

On Sat, Apr 7, 2018 at 1:11 AM, Szymon Szczypiński <si...@poczta.fm>
wrote:

> Hi,
>
> in my case both doesn't deleted. In high-availability.storageDir the
> number of files of type "completedCheckpoint<SomeID>" are growing and also
> dirs in  "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>
> In my case i have Windows DFS filesystem mounted on linux with cifs
> protocol.
>
> Can you give me a hint or description which process is responsible for
> removing those files and directories.
>
> Best regards
>
> W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
>
> Can you clarify which one does not get deleted? The file in the
> "high-availability.storageDir", or the  "state.backend.fs.checkpointdi
> r/JobId/check-<INTEGER>", or both?
>
> Could you also tell us which file system you use?
>
> There is a known issue in some versions of Flink that S3 "directories" are
> not deleted. This means that Hadoop's S3 marker files (the way that
> Hadoop's s3n and s3a  imitate directories in S3) are not deleted. This is
> fixed in Flink 1.5. A workaround for Flink 1.4 is to use the
> "flink-s3-fs-presto", which does not uses these marker files.
>
>
> On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński <si...@poczta.fm>
> wrote:
>
>> Thank you for your replay.
>>
>> But my problem is not that Flink doesn't remove files/directories after
>> incorrect job cancellation. My problem is different.
>> Precisely when everything is ok and job is in RUNNING state, when
>> checkpoint for job is done, then there is created file in
>> "high-availability.storageDir" with name completedCheckpoint<SomeID> and
>> also is created dir in location "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".
>> After the next checkpoint is completed, previous file and dir are deleted,
>> and this is ok, because i always have only one checkpoint.
>> But in my case when next checkpoint is completed, the previous is not
>> deleted and this happens when job is in running state.
>>
>> My be you know why those files/dirs are not deleted.
>>
>> Best regards
>>
>> Szymon Szczypiński
>>
>>
>> On 29.03.2018 11:23, Stephan Ewen wrote:
>>
>> Flink removes these files / directories only if you properly cancel the
>> job. If you kill the processes (stop-cluster.sh) this looks like a failure,
>> and Flink will try to recover. The recovery always starts in ZooKeeper, not
>> in the DFS.
>>
>> Best way to prevent this is to
>>   - properly cancel jobs, not just kill processes
>>   - use separate cluster IDs for the standalone clusters, so that the new
>> cluster knows that it is not supposed to recover the previous jobs and
>> checkpoints
>>
>>
>>
>> On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński <si...@poczta.fm>
>> wrote:
>>
>>> Hi,
>>>
>>> i have problem with Flink in version 1.3.1.
>>>
>>> I have standalone cluster with two JobManagers and four TaskManager, as
>>> DFS i use windows high available storage mounted by cifs protocol.
>>>
>>> And sometimes i'm starting having problem that Flink doesn't remove
>>> checkpoint dirs for job and completedCheckpoint files from
>>> "high-availability.storageDir".
>>>
>>> To bring back cluster to normal working i need to remove all dirs from
>>> DFS and start everything from beginning.
>>>
>>>
>>> Maybe someone of Flink users had the same problem. For now i doesn't
>>> have any idea how to bring back cluster to normal work without deleting
>>> dirs from DFS.
>>>
>>> I don't want to delete dirs from DFS because than  i need to redeploy
>>> all jobs.
>>>
>>>
>>> Best regards
>>>
>>> Szymon Szczypiński
>>>
>>>
>>
>>
>
>

Re: DFS problem with removing checkpoint

Posted by Szymon Szczypiński <si...@poczta.fm>.
Hi,

in my case both doesn't deleted. In high-availability.storageDir the 
number of files of type "completedCheckpoint<SomeID>" are growing and 
also dirs in "state.backend.fs.checkpointdir/JobId/check-<INTEGER>".

In my case i have Windows DFS filesystem mounted on linux with cifs 
protocol.

Can you give me a hint or description which process is responsible for 
removing those files and directories.

Best regards


W dniu 2018-04-02 o 15:58, Stephan Ewen pisze:
> Can you clarify which one does not get deleted? The file in the 
> "high-availability.storageDir", or the 
> "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?
>
> Could you also tell us which file system you use?
>
> There is a known issue in some versions of Flink that S3 "directories" 
> are not deleted. This means that Hadoop's S3 marker files (the way 
> that Hadoop's s3n and s3a  imitate directories in S3) are not deleted. 
> This is fixed in Flink 1.5. A workaround for Flink 1.4 is to use the 
> "flink-s3-fs-presto", which does not uses these marker files.
>
>
> On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński <simon_t@poczta.fm 
> <ma...@poczta.fm>> wrote:
>
>     Thank you for your replay.
>
>     But my problem is not that Flink doesn't remove files/directories
>     after incorrect job cancellation. My problem is different.
>     Precisely when everything is ok and job is in RUNNING state, when
>     checkpoint for job is done, then there is created file in
>     "high-availability.storageDir" with name
>     completedCheckpoint<SomeID> and also is created dir in location
>     "state.backend.fs.checkpointdir/JobId/check-<INTEGER>". After the
>     next checkpoint is completed, previous file and dir are deleted,
>     and this is ok, because i always have only one checkpoint.
>     But in my case when next checkpoint is completed, the previous is
>     not deleted and this happens when job is in running state.
>
>     My be you know why those files/dirs are not deleted.
>
>     Best regards
>
>     Szymon Szczypiński
>
>
>     On 29.03.2018 11:23, Stephan Ewen wrote:
>>     Flink removes these files / directories only if you properly
>>     cancel the job. If you kill the processes (stop-cluster.sh) this
>>     looks like a failure, and Flink will try to recover. The recovery
>>     always starts in ZooKeeper, not in the DFS.
>>
>>     Best way to prevent this is to
>>       - properly cancel jobs, not just kill processes
>>       - use separate cluster IDs for the standalone clusters, so that
>>     the new cluster knows that it is not supposed to recover the
>>     previous jobs and checkpoints
>>
>>
>>
>>     On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński
>>     <simon_t@poczta.fm <ma...@poczta.fm>> wrote:
>>
>>         Hi,
>>
>>         i have problem with Flink in version 1.3.1.
>>
>>         I have standalone cluster with two JobManagers and four
>>         TaskManager, as
>>         DFS i use windows high available storage mounted by cifs
>>         protocol.
>>
>>         And sometimes i'm starting having problem that Flink doesn't
>>         remove
>>         checkpoint dirs for job and completedCheckpoint files from
>>         "high-availability.storageDir".
>>
>>         To bring back cluster to normal working i need to remove all
>>         dirs from
>>         DFS and start everything from beginning.
>>
>>
>>         Maybe someone of Flink users had the same problem. For now i
>>         doesn't
>>         have any idea how to bring back cluster to normal work
>>         without deleting
>>         dirs from DFS.
>>
>>         I don't want to delete dirs from DFS because than  i need to
>>         redeploy
>>         all jobs.
>>
>>
>>         Best regards
>>
>>         Szymon Szczypiński
>>
>>
>
>


Re: DFS problem with removing checkpoint

Posted by Stephan Ewen <se...@apache.org>.
Can you clarify which one does not get deleted? The file in the
"high-availability.storageDir",
or the  "state.backend.fs.checkpointdir/JobId/check-<INTEGER>", or both?

Could you also tell us which file system you use?

There is a known issue in some versions of Flink that S3 "directories" are
not deleted. This means that Hadoop's S3 marker files (the way that
Hadoop's s3n and s3a  imitate directories in S3) are not deleted. This is
fixed in Flink 1.5. A workaround for Flink 1.4 is to use the
"flink-s3-fs-presto", which does not uses these marker files.


On Thu, Mar 29, 2018 at 8:49 PM, Szymon Szczypiński <si...@poczta.fm>
wrote:

> Thank you for your replay.
>
> But my problem is not that Flink doesn't remove files/directories after
> incorrect job cancellation. My problem is different.
> Precisely when everything is ok and job is in RUNNING state, when
> checkpoint for job is done, then there is created file in
> "high-availability.storageDir" with name completedCheckpoint<SomeID> and
> also is created dir in location "state.backend.fs.
> checkpointdir/JobId/check-<INTEGER>". After the next checkpoint is
> completed, previous file and dir are deleted, and this is ok, because i
> always have only one checkpoint.
> But in my case when next checkpoint is completed, the previous is not
> deleted and this happens when job is in running state.
>
> My be you know why those files/dirs are not deleted.
>
> Best regards
>
> Szymon Szczypiński
>
>
> On 29.03.2018 11:23, Stephan Ewen wrote:
>
> Flink removes these files / directories only if you properly cancel the
> job. If you kill the processes (stop-cluster.sh) this looks like a failure,
> and Flink will try to recover. The recovery always starts in ZooKeeper, not
> in the DFS.
>
> Best way to prevent this is to
>   - properly cancel jobs, not just kill processes
>   - use separate cluster IDs for the standalone clusters, so that the new
> cluster knows that it is not supposed to recover the previous jobs and
> checkpoints
>
>
>
> On Wed, Mar 28, 2018 at 11:22 PM, Szymon Szczypiński <si...@poczta.fm>
> wrote:
>
>> Hi,
>>
>> i have problem with Flink in version 1.3.1.
>>
>> I have standalone cluster with two JobManagers and four TaskManager, as
>> DFS i use windows high available storage mounted by cifs protocol.
>>
>> And sometimes i'm starting having problem that Flink doesn't remove
>> checkpoint dirs for job and completedCheckpoint files from
>> "high-availability.storageDir".
>>
>> To bring back cluster to normal working i need to remove all dirs from
>> DFS and start everything from beginning.
>>
>>
>> Maybe someone of Flink users had the same problem. For now i doesn't
>> have any idea how to bring back cluster to normal work without deleting
>> dirs from DFS.
>>
>> I don't want to delete dirs from DFS because than  i need to redeploy
>> all jobs.
>>
>>
>> Best regards
>>
>> Szymon Szczypiński
>>
>>
>
>