You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Cliff Resnick (JIRA)" <ji...@apache.org> on 2016/11/16 20:18:58 UTC

[jira] [Created] (FLINK-5083) Race condition in Rolling/Bucketing Sink pending files cleanup

Cliff Resnick created FLINK-5083:
------------------------------------

             Summary: Race condition in Rolling/Bucketing Sink pending files cleanup
                 Key: FLINK-5083
                 URL: https://issues.apache.org/jira/browse/FLINK-5083
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.1.3, 1.2.0
            Reporter: Cliff Resnick


In both Open and Restore methods there is code that:

1. gets a recursive listing from baseDir
2. iterates listing and name checks filenames based on subtaskIndex and other criteria to find pending or in-progress files. If found delete.

The problem is that the recursive listing gets all files for all subtaskIndexes. The race error is when #hasNext is called as part of the iteration, a hidden existence check is made on the "next" file, which was deleted by another task after-listing but pre-iteration, so an error is thrown and the job fails. 

Depending on the number of pending files, this condition may outlast the number of job retries, each failing on a different file.

A solution would be use #listStatus instead. The hadoop FileSystem supports a PathFilter in its #listStatus calls, but not in the recursive #listFiles call. The cleanup is performed from the baseDir so the recursive listing would have to be in Flink. 

This touches on another issue. Over time, the directory listing is bound to get very large, and re-listing everything from the baseDir may get increasingly expensive, especially if the Fs is S3. Maybe we can have a Bucketer callback to return a list of cleanup root directories based on the current file? I'm guessing most people are using time based bucketing, so there's only so much of a period where cleanup will matter. If so, then this would solve for the above recursive listing problem.








 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)