You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Juan Miguel Cejuela <ju...@tagtog.net> on 2018/10/12 18:40:55 UTC

Not all files are processed? Stream source with ContinuousFileMonitoringFunction

Dear flinksters,


I'm using the class `ContinuousFileMonitoringFunction` as a source to
monitor a folder for new incoming files.* I have the problem that not all
the files that are sent to the folder get processed / triggered by the
function*. Specific details of my workflow is that I send up to 1k files
per minute, and that I consume the stream as a `AsyncDataStream`.

I myself raised an unrelated issue with the
`ContinuousFileMonitoringFunction` class some time ago (
https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
shared the very same timestamp, only the first one (non-deterministically
chosen) would be processed. However, I patched the file myself to fix that
problem by using a LinkedHashMap to remember which files had been really
processed before or not. My patch is working fine as far as I can tell.

The problem seems to be rather that some files (when many are sent at once
to the same folder) do not even get triggered/activated/registered by the
class.


Am I properly explaining my problem?


Any hints to solve this challenge would be greatly appreciated ! ❤ THANK YOU

-- 
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net
<https://twitter.com/tagtog_net>

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

Posted by Juan Miguel Cejuela <ju...@tagtog.net>.
Update:

not 100% sure, but I think I fixed my bug. This is what I did:

I reduced (quite a lot) the maximum number of parallel operations in my
`AsyncDataStream`. I had set it initially to 1000. The default of 100 did
not work for me either. But somehow when I set the value to 10, everything
is working fine now.

```
AsyncDataStream.unorderedWait(dataSource, new AsyncProcessing(), 5,
TimeUnit.MINUTES, 10)
```

Perhaps too much memory was used at once and therefore some files were
discarded? Don't know, but hopefully my solutions throws some clues to
other people in the future.

On Sat, 13 Oct 2018 at 12:48 Juan Miguel Cejuela <ju...@tagtog.net> wrote:

> I’m using both a local (Unix) file system and hdfs.
>
> I’m going to check those to get ideas, thank you!
>
> I’m also checking the internal code of the class and my own older patch
> code.
> On Fri 12. Oct 2018 at 21:32, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi,
>>
>> Which file system are you reading from? If you are reading from S3, this
>> might be cause by S3's eventual consistency property.
>> Have a look at FLINK-9940 [1] for a more detailed discussion.
>> There is also an open PR [2], that you could try to patch the source
>> operator with.
>>
>> Best, Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-9940
>> [2] https://github.com/apache/flink/pull/6613
>>
>> Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
>> juanmi@tagtog.net>:
>>
>>> Dear flinksters,
>>>
>>>
>>> I'm using the class `ContinuousFileMonitoringFunction` as a source to
>>> monitor a folder for new incoming files.* I have the problem that not
>>> all the files that are sent to the folder get processed / triggered by the
>>> function*. Specific details of my workflow is that I send up to 1k
>>> files per minute, and that I consume the stream as a `AsyncDataStream`.
>>>
>>> I myself raised an unrelated issue with the
>>> `ContinuousFileMonitoringFunction` class some time ago (
>>> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
>>> shared the very same timestamp, only the first one (non-deterministically
>>> chosen) would be processed. However, I patched the file myself to fix that
>>> problem by using a LinkedHashMap to remember which files had been really
>>> processed before or not. My patch is working fine as far as I can tell.
>>>
>>> The problem seems to be rather that some files (when many are sent at
>>> once to the same folder) do not even get triggered/activated/registered by
>>> the class.
>>>
>>>
>>> Am I properly explaining my problem?
>>>
>>>
>>> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
>>> YOU
>>>
>>> --
>>> Juanmi, CEO and co-founder @ 🍃tagtog.net
>>>
>>>     Follow tagtog updates on 🐦 Twitter: @tagtog_net
>>> <https://twitter.com/tagtog_net>
>>>
>>> --
> Juanmi, CEO and co-founder @ 🍃tagtog.net
>
>     Follow tagtog updates on 🐦 Twitter: @tagtog_net
> <https://twitter.com/tagtog_net>
>
> --
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net
<https://twitter.com/tagtog_net>

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

Posted by Juan Miguel Cejuela <ju...@tagtog.net>.
I’m using both a local (Unix) file system and hdfs.

I’m going to check those to get ideas, thank you!

I’m also checking the internal code of the class and my own older patch
code.
On Fri 12. Oct 2018 at 21:32, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> Which file system are you reading from? If you are reading from S3, this
> might be cause by S3's eventual consistency property.
> Have a look at FLINK-9940 [1] for a more detailed discussion.
> There is also an open PR [2], that you could try to patch the source
> operator with.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9940
> [2] https://github.com/apache/flink/pull/6613
>
> Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
> juanmi@tagtog.net>:
>
>> Dear flinksters,
>>
>>
>> I'm using the class `ContinuousFileMonitoringFunction` as a source to
>> monitor a folder for new incoming files.* I have the problem that not
>> all the files that are sent to the folder get processed / triggered by the
>> function*. Specific details of my workflow is that I send up to 1k files
>> per minute, and that I consume the stream as a `AsyncDataStream`.
>>
>> I myself raised an unrelated issue with the
>> `ContinuousFileMonitoringFunction` class some time ago (
>> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
>> shared the very same timestamp, only the first one (non-deterministically
>> chosen) would be processed. However, I patched the file myself to fix that
>> problem by using a LinkedHashMap to remember which files had been really
>> processed before or not. My patch is working fine as far as I can tell.
>>
>> The problem seems to be rather that some files (when many are sent at
>> once to the same folder) do not even get triggered/activated/registered by
>> the class.
>>
>>
>> Am I properly explaining my problem?
>>
>>
>> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
>> YOU
>>
>> --
>> Juanmi, CEO and co-founder @ 🍃tagtog.net
>>
>>     Follow tagtog updates on 🐦 Twitter: @tagtog_net
>> <https://twitter.com/tagtog_net>
>>
>> --
Juanmi, CEO and co-founder @ 🍃tagtog.net

    Follow tagtog updates on 🐦 Twitter: @tagtog_net
<https://twitter.com/tagtog_net>

Re: Not all files are processed? Stream source with ContinuousFileMonitoringFunction

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Which file system are you reading from? If you are reading from S3, this
might be cause by S3's eventual consistency property.
Have a look at FLINK-9940 [1] for a more detailed discussion.
There is also an open PR [2], that you could try to patch the source
operator with.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9940
[2] https://github.com/apache/flink/pull/6613

Am Fr., 12. Okt. 2018 um 20:41 Uhr schrieb Juan Miguel Cejuela <
juanmi@tagtog.net>:

> Dear flinksters,
>
>
> I'm using the class `ContinuousFileMonitoringFunction` as a source to
> monitor a folder for new incoming files.* I have the problem that not all
> the files that are sent to the folder get processed / triggered by the
> function*. Specific details of my workflow is that I send up to 1k files
> per minute, and that I consume the stream as a `AsyncDataStream`.
>
> I myself raised an unrelated issue with the
> `ContinuousFileMonitoringFunction` class some time ago (
> https://issues.apache.org/jira/browse/FLINK-8046): if two or more files
> shared the very same timestamp, only the first one (non-deterministically
> chosen) would be processed. However, I patched the file myself to fix that
> problem by using a LinkedHashMap to remember which files had been really
> processed before or not. My patch is working fine as far as I can tell.
>
> The problem seems to be rather that some files (when many are sent at once
> to the same folder) do not even get triggered/activated/registered by the
> class.
>
>
> Am I properly explaining my problem?
>
>
> Any hints to solve this challenge would be greatly appreciated ! ❤ THANK
> YOU
>
> --
> Juanmi, CEO and co-founder @ 🍃tagtog.net
>
>     Follow tagtog updates on 🐦 Twitter: @tagtog_net
> <https://twitter.com/tagtog_net>
>
>