You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Andrei Shumanski (JIRA)" <ji...@apache.org> on 2018/06/14 10:24:00 UTC

[jira] [Created] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files

Andrei Shumanski created FLINK-9587:
---------------------------------------

             Summary: ContinuousFileMonitoringFunction crashes on short living files
                 Key: FLINK-9587
                 URL: https://issues.apache.org/jira/browse/FLINK-9587
             Project: Flink
          Issue Type: Bug
          Components: FileSystem, Streaming, Streaming Connectors
    Affects Versions: 1.5.0
         Environment: Flink 1.5 running as a standalone cluster.
            Reporter: Andrei Shumanski


Hi,

 

We use Flink to monitor a directory for new files. The filesystem is a MapR Fuse mount that looks like a local FS.

The files are copied to the directory by another process that uses rsync command. While a file is not completely written rsync creates a temporary file with a name like ".file.txt.uM6MfZ" where the last extension is a random string.

When the copying is done - file is renamed to the final name "file.txt".

 

The bug is that Flink does not correctly handle this behavior and does not take into account that files in the directory might be deleted.

 

We are getting error traces:
{code:java}
java.io.FileNotFoundException: File file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or the user running Flink ('root') has insufficient permissions to access it.
at org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
at org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
{code}
In LocalFileSystem.listStatus(final Path f) we read the list of files in a directory and then create LocalFileStatus object for each of the files. But a file might be removed during the interval between these operations.

I do not see any option to handle this exception in our code.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)