You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Bowen Li (JIRA)" <ji...@apache.org> on 2018/09/28 23:58:00 UTC

[jira] [Commented] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files

    [ https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632656#comment-16632656 ] 

Bowen Li commented on FLINK-9587:
---------------------------------

Set a FilePathFilter in FileInputFormat and just ignore any temporary file by recognizing their naming pattern - isn't it a much easier way?

> ContinuousFileMonitoringFunction crashes on short living files
> --------------------------------------------------------------
>
>                 Key: FLINK-9587
>                 URL: https://issues.apache.org/jira/browse/FLINK-9587
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, Streaming, Streaming Connectors
>    Affects Versions: 1.5.0
>         Environment: Flink 1.5 running as a standalone cluster.
>            Reporter: Andrei Shumanski
>            Priority: Critical
>             Fix For: 1.7.0, 1.6.2
>
>
> Hi,
>  
> We use Flink to monitor a directory for new files. The filesystem is a MapR Fuse mount that looks like a local FS.
> The files are copied to the directory by another process that uses rsync command. While a file is not completely written rsync creates a temporary file with a name like ".file.txt.uM6MfZ" where the last extension is a random string.
> When the copying is done - file is renamed to the final name "file.txt".
>  
> The bug is that Flink does not correctly handle this behavior and does not take into account that files in the directory might be deleted.
>  
> We are getting error traces:
> {code:java}
> java.io.FileNotFoundException: File file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or the user running Flink ('root') has insufficient permissions to access it.
> at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
> at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
> at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
> at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> In LocalFileSystem.listStatus(final Path f) we read the list of files in a directory and then create LocalFileStatus object for each of the files. But a file might be removed during the interval between these operations.
> I do not see any option to handle this exception in our code.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)