You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Miguel Cejuela <ju...@tagtog.net> on 2017/11/10 11:54:23 UTC

readFile, DataStream

Hi there,

I’m trying to watch a directory for new incoming files (with
StreamExecutionEnvironment#readFile) with a subsecond latency (interval
watch of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY
).

If many files come in within (under) the interval watching time, flink
doesn’t seem to get notice of the files, and as a result, the files do not
get processed. The behavior also seems undeterministic, as it likely
depends on timeouts and so on. For example, 10 new files come in
immediately (that is, essentially in parallel) and perhaps 3 files get
processed, but the rest 7 don’t.

I’ve extended and created my own FileInputFormat, for which I don’t do much
more than in the open function, log when a new file comes in. That’s how I
know that many fails get lost.

On the other hand, when I restart flink, *all* the files in the directory
are immediately processed. This is the expected behavior and works fine.

The situation of unprocessed files is a bummer.

Am I doing something wrong? Do I need to set something in the
configuration? Is it a bug in Flink?

Hopefully I described my problem clearly.

Thank you.
​

Re: readFile, DataStream

Posted by Juan Miguel Cejuela <ju...@tagtog.net>.
Hi Kostas,

thank you very much for your answer.

Yes, I proposed the change in https://github.com/apache/flink/pull/4997 to
compare as modificationTime < globalModificationTime (without accepting
equals). Later, however, I realized, as you correctly point out, that this
creates duplicates.

The original and now deprecated FileMonitoringFunction.java indeed kept a
map of filenames to their timestamps.

That works. However, this memory consumption is likely too much for my
application, as I may process millions of files.

What I’ve done so far is to create my own
MyPatchedContinuousFileMonitoringFunction that has a similar map, however
implemented with a LinkedHashMap to limit the size of the map to a desired
max num of entries, as in:

private volatile Map<String, Boolean> filenamesAlreadySeen = new
LinkedHashMap<String, Boolean>() {

        @Override
        protected boolean removeEldestEntry(Map.Entry<String, Boolean> eldest) {
            return size() > MAX_ENTRIES;
        }
    };

and then changed shouldIgnore to:

private boolean shouldIgnore(Path filePath, long modificationTime) {
        assert (Thread.holdsLock(checkpointLock));
        boolean alreadySeen =
filenamesAlreadySeen.containsKey(filePath.getName());
        boolean shouldIgnore = alreadySeen || modificationTime <
globalModificationTime;
        filenamesAlreadySeen.put(filePath.getName(), true);

        if (shouldIgnore && LOG.isDebugEnabled()) {
            LOG.debug("Ignoring " + filePath + ", with mod time= " +
modificationTime +
                " and global mod time= " + globalModificationTime);
        }
        return shouldIgnore;
    }

This is a partial solution that works now for me. However, it’s still a
hack and very particular solution.

I think the real solution would be also to use the accessTime (not only the
modificationTime). However, as I pointed out in the github pull request, as
of flink 1.3.2, access time is always 0, at least on my machine and local
file system (macOS).
​

On Mon, 13 Nov 2017 at 10:47 Kostas Kloudas <k....@data-artisans.com>
wrote:

> Hi Juan,
>
> The problem is that once a file for a certain timestamp is processed and
> the global modification timestamp is modified,
> then all files for that timestamp are considered processed.
>
> The solution is *not* to remove the = from the modificationTime <=
> globalModificationTime; in ContinuousFileMonitoringFunction, as this
> would lead to duplicates.
> The solution, in my opinion is to keep a list of the filenames (or hashes)
> of the files processed for the last globalModTimestamp (and only for that
> timestamp) and when there are new with the same timestamp, then check if
> the name of the file they belong is in that list.
>
> This way you pay a bit of memory but you get what you want.
>
> What do you think?
>
> Thanks,
> Kostas
>
> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela <ju...@tagtog.net>
> wrote:
>
> Hi there,
>
> I’m trying to watch a directory for new incoming files (with
> StreamExecutionEnvironment#readFile) with a subsecond latency (interval
> watch of ~100ms, and using the flag
> FileProcessingMode.PROCESS_CONTINUOUSLY).
>
> If many files come in within (under) the interval watching time, flink
> doesn’t seem to get notice of the files, and as a result, the files do not
> get processed. The behavior also seems undeterministic, as it likely
> depends on timeouts and so on. For example, 10 new files come in
> immediately (that is, essentially in parallel) and perhaps 3 files get
> processed, but the rest 7 don’t.
>
> I’ve extended and created my own FileInputFormat, for which I don’t do
> much more than in the open function, log when a new file comes in. That’s
> how I know that many fails get lost.
>
> On the other hand, when I restart flink, *all* the files in the directory
> are immediately processed. This is the expected behavior and works fine.
>
> The situation of unprocessed files is a bummer.
>
> Am I doing something wrong? Do I need to set something in the
> configuration? Is it a bug in Flink?
>
> Hopefully I described my problem clearly.
>
> Thank you.
> ​
>
>
>

Re: readFile, DataStream

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Juan,

The problem is that once a file for a certain timestamp is processed and the global modification timestamp is modified, 
then all files for that timestamp are considered processed.

The solution is not to remove the = from the modificationTime <= globalModificationTime; in ContinuousFileMonitoringFunction, as this 
would lead to duplicates. 
The solution, in my opinion is to keep a list of the filenames (or hashes) of the files processed for the last globalModTimestamp (and only for that timestamp) and when there are new with the same timestamp, then check if the name of the file they belong is in that list. 

This way you pay a bit of memory but you get what you want.

What do you think?

Thanks,
Kostas

> On Nov 10, 2017, at 12:54 PM, Juan Miguel Cejuela <ju...@tagtog.net> wrote:
> 
> Hi there,
> 
> I’m trying to watch a directory for new incoming files (with StreamExecutionEnvironment#readFile) with a subsecond latency (interval watch of ~100ms, and using the flag FileProcessingMode.PROCESS_CONTINUOUSLY).
> 
> If many files come in within (under) the interval watching time, flink doesn’t seem to get notice of the files, and as a result, the files do not get processed. The behavior also seems undeterministic, as it likely depends on timeouts and so on. For example, 10 new files come in immediately (that is, essentially in parallel) and perhaps 3 files get processed, but the rest 7 don’t.
> 
> I’ve extended and created my own FileInputFormat, for which I don’t do much more than in the open function, log when a new file comes in. That’s how I know that many fails get lost.
> 
> On the other hand, when I restart flink, all the files in the directory are immediately processed. This is the expected behavior and works fine.
> 
> The situation of unprocessed files is a bummer.
> 
> Am I doing something wrong? Do I need to set something in the configuration? Is it a bug in Flink?
> 
> Hopefully I described my problem clearly.
> 
> Thank you.
>