You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Chmielewski <kr...@gmail.com> on 2022/01/05 23:03:46 UTC

StaticFileSplitEnumerator - keeping track 9f already processed files

Hi,
Why StaticFileSplitEnumerator from FileSource does not track the already
processed files similar to how ContinuousFileSplitEnumerator does?

I'm thinking about scenario where we have a Bounded FileSource that reads a
lot of files using FileSource and stream it's content to Kafka.If there
will be a Job/cluster restart then we will process same files again.

Regards,
Krzysztof Chmielewski

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

Posted by Fabian Paul <fp...@apache.org>.
Hi Krzysztof,

Thanks for your investigation. Can you maybe share the code with us?
collectWithClient will insert a custom sink into the datastream that
buffers all incoming records and make them queryable. It is already
deprecated and one should use executeAndCollect that fulfills the same
purpose.

There is a difference between the execution modes streaming and batch,
and boundedness of a source. It is possible to execute a bounded
source in streaming mode and therefore have checkpoints.

For your experiments did you only change the boundedness or also the
runtime mode? [1]

Best,
Fabian

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Tue, Jan 11, 2022 at 12:04 PM Krzysztof Chmielewski
<kr...@gmail.com> wrote:
>
> Hi Fabian,
> Thank you for your input and I'm sorry for delay on my part.
>
> Before I will create a ticket I would like to ask about one thing more.
> There is test FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
> This test uses DataStreamUtils.collectWithClient(...) which returns an iterator that we later to get the processing results.
>
> I did a quick PoC where I created my own FileSource that uses alreadyProcessedFiles Set in Bounded mode, it is based on FileSource implementation.
> I noticed some issues with this test, when i use it for my Bounded Split Enumerator that keeps track of already processed files. For example
>
> Case 1:
> a) set execution mode to Streaming
> b) set checkpoint Interval to 10 milis
> Result : test fails because result has fever records that it is expected, actually it reports zero records in the result.
>
> Case 2:
> a) set execution mode to Streaming
> b) disable checkpoint
> Result: test passes
>
> Case 3:
> a) set execution mode to Bounded
> b) disable checkpoint
> Result Test Passes
>
> Case 3:
> a) set execution mode to Bounded
> b) enable checkpoint
> Result Test Passes (since checkpoints are ignored in BATCH mode)
>
> I looked at testBoundedTextFileSource and testContinuousTextFileSource methods and I understand idea how the Cluster failover is trigger and whatnot.
> Although I do see that gathering the final results for Continuous mode is slightly different.
> Could you shed some light on this and how the collectWithClient works especially in case if Failover.
>
> Thanks,
> Krzysztof Chmielewski
>
> czw., 6 sty 2022 o 09:29 Fabian Paul <fp...@apache.org> napisał(a):
>>
>> Hi,
>>
>> I think your analysis is correct. One thing to note here is that I
>> guess when implementing the StaticFileSplitEnumerator we only thought
>> about the batch case where no checkpoints exist [1] on the other hand
>> it is possible as you have noted to run a bounded source in streaming
>> mode.
>>
>> Although in the current implementation we already checkpoint the
>> remaining splits of the StaticFileSplitEnumerator so it should be easy
>> to also pass the alreadyDiscoveredPaths to the
>> StaticFileSplitEnumerator.
>>
>> @Krzysztof Chmielewski can you create a ticket for that?
>>
>> Best,
>> Fabian
>>
>>
>> [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>>
>> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
>> <kr...@gmail.com> wrote:
>> >
>> > Hi,
>> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the monitored folder for the new files and StaticFileSplitEnumerator does not, this is clear.
>> >
>> > However I was asking about a different scenario, the scenario when we are restoring from a checkpoint.
>> > FileSource can process many files, not only one. The underlying API uses array of paths not just single Path.
>> >
>> > If I understand correctly, when we are recovering from a checkpoint, for example due to Job Manager issue, FileEnumerator will create an Array of Splits and pass it to StaticFileSplitEnumerator.
>> >
>> > Same goes for ContinuousFileSplitEnumerator. However  when ContinuousFileSplitEnumerator is started, it iterates through Path[] array and checks which files were already processed and skip them using pathsAlreadyProcessed set hence not creating Splits for those files.
>> >
>> > However it seems that StaticFileSplitEnumerator will reprocess files that were already used for Split creation. In case of Checkpoint restoration it does not check if that file was already processed.
>> >
>> > Regards,
>> > Krzysztof Chmielewski
>> >
>> >
>> >
>> >
>> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <ts...@gmail.com> napisał:
>> >>
>> >> Hi!
>> >>
>> >> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>> >>
>> >> This is because ContinuousFileSplitEnumerator has to continuously add new files to splitAssigner, while StaticFileSplitEnumerator does not. The pathsAlreadyProcessed set records the paths already discovered by ContinuousFileSplitEnumerator so that it will not add the same file to splitAssigner twice. For StaticFileSplitEnumerator it does not need to discover new files and all files have already been recorded in its splitAssigner so it does not need the pathsAlreadyProcessed set.
>> >>
>> >> For more detailed logic check the caller of the constructors of both enumerators.
>> >>
>> >> Krzysztof Chmielewski <kr...@gmail.com> 于2022年1月6日周四 07:04写道:
>> >>>
>> >>> Hi,
>> >>> Why StaticFileSplitEnumerator from FileSource does not track the already processed files similar to how ContinuousFileSplitEnumerator does?
>> >>>
>> >>> I'm thinking about scenario where we have a Bounded FileSource that reads a lot of files using FileSource and stream it's content to Kafka.If there will be a Job/cluster restart then we will process same files again.
>> >>>
>> >>> Regards,
>> >>> Krzysztof Chmielewski

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi Fabian,
Thank you for your input and I'm sorry for delay on my part.

Before I will create a ticket I would like to ask about one thing more.
There is
test FileSourceTextLinesITCase::testBoundedTextFileSourceWithJobManagerFailover
This test uses DataStreamUtils.collectWithClient(...) which returns an
iterator that we later to get the processing results.

I did a quick PoC where I created my own FileSource that uses
alreadyProcessedFiles Set in Bounded mode, it is based on FileSource
implementation.
I noticed some issues with this test, when i use it for my Bounded Split
Enumerator that keeps track of already processed files. For example

Case 1:
a) set execution mode to Streaming
b) set checkpoint Interval to 10 milis
Result : test fails because result has fever records that it is expected,
actually it reports zero records in the result.

Case 2:
a) set execution mode to Streaming
b) disable checkpoint
Result: test passes

Case 3:
a) set execution mode to Bounded
b) disable checkpoint
Result Test Passes

Case 3:
a) set execution mode to Bounded
b) enable checkpoint
Result Test Passes (since checkpoints are ignored in BATCH mode)

I looked at testBoundedTextFileSource and testContinuousTextFileSource
methods and I understand idea how the Cluster failover is trigger and
whatnot.
Although I do see that gathering the final results for Continuous mode is
slightly different.
Could you shed some light on this and how the collectWithClient works
especially in case if Failover.

Thanks,
Krzysztof Chmielewski

czw., 6 sty 2022 o 09:29 Fabian Paul <fp...@apache.org> napisał(a):

> Hi,
>
> I think your analysis is correct. One thing to note here is that I
> guess when implementing the StaticFileSplitEnumerator we only thought
> about the batch case where no checkpoints exist [1] on the other hand
> it is possible as you have noted to run a bounded source in streaming
> mode.
>
> Although in the current implementation we already checkpoint the
> remaining splits of the StaticFileSplitEnumerator so it should be easy
> to also pass the alreadyDiscoveredPaths to the
> StaticFileSplitEnumerator.
>
> @Krzysztof Chmielewski can you create a ticket for that?
>
> Best,
> Fabian
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming
>
> On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
> <kr...@gmail.com> wrote:
> >
> > Hi,
> > Yes I know that ContinuousFileSplitEnumerator has continuously scan the
> monitored folder for the new files and StaticFileSplitEnumerator does not,
> this is clear.
> >
> > However I was asking about a different scenario, the scenario when we
> are restoring from a checkpoint.
> > FileSource can process many files, not only one. The underlying API uses
> array of paths not just single Path.
> >
> > If I understand correctly, when we are recovering from a checkpoint, for
> example due to Job Manager issue, FileEnumerator will create an Array of
> Splits and pass it to StaticFileSplitEnumerator.
> >
> > Same goes for ContinuousFileSplitEnumerator. However  when
> ContinuousFileSplitEnumerator is started, it iterates through Path[] array
> and checks which files were already processed and skip them using
> pathsAlreadyProcessed set hence not creating Splits for those files.
> >
> > However it seems that StaticFileSplitEnumerator will reprocess files
> that were already used for Split creation. In case of Checkpoint
> restoration it does not check if that file was already processed.
> >
> > Regards,
> > Krzysztof Chmielewski
> >
> >
> >
> >
> > czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <ts...@gmail.com>
> napisał:
> >>
> >> Hi!
> >>
> >> Do you mean the pathsAlreadyProcessed set in
> ContinuousFileSplitEnumerator?
> >>
> >> This is because ContinuousFileSplitEnumerator has to continuously add
> new files to splitAssigner, while StaticFileSplitEnumerator does not. The
> pathsAlreadyProcessed set records the paths already discovered by
> ContinuousFileSplitEnumerator so that it will not add the same file to
> splitAssigner twice. For StaticFileSplitEnumerator it does not need to
> discover new files and all files have already been recorded in its
> splitAssigner so it does not need the pathsAlreadyProcessed set.
> >>
> >> For more detailed logic check the caller of the constructors of both
> enumerators.
> >>
> >> Krzysztof Chmielewski <kr...@gmail.com> 于2022年1月6日周四
> 07:04写道:
> >>>
> >>> Hi,
> >>> Why StaticFileSplitEnumerator from FileSource does not track the
> already processed files similar to how ContinuousFileSplitEnumerator does?
> >>>
> >>> I'm thinking about scenario where we have a Bounded FileSource that
> reads a lot of files using FileSource and stream it's content to Kafka.If
> there will be a Job/cluster restart then we will process same files again.
> >>>
> >>> Regards,
> >>> Krzysztof Chmielewski
>

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

Posted by Fabian Paul <fp...@apache.org>.
Hi,

I think your analysis is correct. One thing to note here is that I
guess when implementing the StaticFileSplitEnumerator we only thought
about the batch case where no checkpoints exist [1] on the other hand
it is possible as you have noted to run a bounded source in streaming
mode.

Although in the current implementation we already checkpoint the
remaining splits of the StaticFileSplitEnumerator so it should be easy
to also pass the alreadyDiscoveredPaths to the
StaticFileSplitEnumerator.

@Krzysztof Chmielewski can you create a ticket for that?

Best,
Fabian


[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/#execution-mode-batchstreaming

On Thu, Jan 6, 2022 at 9:13 AM Krzysztof Chmielewski
<kr...@gmail.com> wrote:
>
> Hi,
> Yes I know that ContinuousFileSplitEnumerator has continuously scan the monitored folder for the new files and StaticFileSplitEnumerator does not, this is clear.
>
> However I was asking about a different scenario, the scenario when we are restoring from a checkpoint.
> FileSource can process many files, not only one. The underlying API uses array of paths not just single Path.
>
> If I understand correctly, when we are recovering from a checkpoint, for example due to Job Manager issue, FileEnumerator will create an Array of Splits and pass it to StaticFileSplitEnumerator.
>
> Same goes for ContinuousFileSplitEnumerator. However  when ContinuousFileSplitEnumerator is started, it iterates through Path[] array and checks which files were already processed and skip them using pathsAlreadyProcessed set hence not creating Splits for those files.
>
> However it seems that StaticFileSplitEnumerator will reprocess files that were already used for Split creation. In case of Checkpoint restoration it does not check if that file was already processed.
>
> Regards,
> Krzysztof Chmielewski
>
>
>
>
> czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <ts...@gmail.com> napisał:
>>
>> Hi!
>>
>> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>>
>> This is because ContinuousFileSplitEnumerator has to continuously add new files to splitAssigner, while StaticFileSplitEnumerator does not. The pathsAlreadyProcessed set records the paths already discovered by ContinuousFileSplitEnumerator so that it will not add the same file to splitAssigner twice. For StaticFileSplitEnumerator it does not need to discover new files and all files have already been recorded in its splitAssigner so it does not need the pathsAlreadyProcessed set.
>>
>> For more detailed logic check the caller of the constructors of both enumerators.
>>
>> Krzysztof Chmielewski <kr...@gmail.com> 于2022年1月6日周四 07:04写道:
>>>
>>> Hi,
>>> Why StaticFileSplitEnumerator from FileSource does not track the already processed files similar to how ContinuousFileSplitEnumerator does?
>>>
>>> I'm thinking about scenario where we have a Bounded FileSource that reads a lot of files using FileSource and stream it's content to Kafka.If there will be a Job/cluster restart then we will process same files again.
>>>
>>> Regards,
>>> Krzysztof Chmielewski

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Hi,
Yes I know that ContinuousFileSplitEnumerator has continuously scan the
monitored folder for the new files and StaticFileSplitEnumerator does not,
this is clear.

However I was asking about a different scenario, the scenario when we are
restoring from a checkpoint.
FileSource can process many files, not only one. The underlying API uses
array of paths not just single Path.

If I understand correctly, when we are recovering from a checkpoint, for
example due to Job Manager issue, FileEnumerator will create an Array of
Splits and pass it to StaticFileSplitEnumerator.

Same goes for ContinuousFileSplitEnumerator. However  when
ContinuousFileSplitEnumerator is started, it iterates through Path[] array
and checks which files were already processed and skip them using
pathsAlreadyProcessed set hence not creating Splits for those files.

However it seems that StaticFileSplitEnumerator will reprocess files that
were already used for Split creation. In case of Checkpoint restoration it
does not check if that file was already processed.

Regards,
Krzysztof Chmielewski




czw., 6 sty 2022, 03:21 użytkownik Caizhi Weng <ts...@gmail.com>
napisał:

> Hi!
>
> Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?
>
> This is because ContinuousFileSplitEnumerator has to continuously add new
> files to splitAssigner, while StaticFileSplitEnumerator does not.
> The pathsAlreadyProcessed set records the paths already discovered
> by ContinuousFileSplitEnumerator so that it will not add the same file to
> splitAssigner twice. For StaticFileSplitEnumerator it does not need to
> discover new files and all files have already been recorded in its
> splitAssigner so it does not need the pathsAlreadyProcessed set.
>
> For more detailed logic check the caller of the constructors of both
> enumerators.
>
> Krzysztof Chmielewski <kr...@gmail.com> 于2022年1月6日周四
> 07:04写道:
>
>> Hi,
>> Why StaticFileSplitEnumerator from FileSource does not track the already
>> processed files similar to how ContinuousFileSplitEnumerator does?
>>
>> I'm thinking about scenario where we have a Bounded FileSource that reads
>> a lot of files using FileSource and stream it's content to Kafka.If there
>> will be a Job/cluster restart then we will process same files again.
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>

Re: StaticFileSplitEnumerator - keeping track 9f already processed files

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

Do you mean the pathsAlreadyProcessed set in ContinuousFileSplitEnumerator?

This is because ContinuousFileSplitEnumerator has to continuously add new
files to splitAssigner, while StaticFileSplitEnumerator does not.
The pathsAlreadyProcessed set records the paths already discovered
by ContinuousFileSplitEnumerator so that it will not add the same file to
splitAssigner twice. For StaticFileSplitEnumerator it does not need to
discover new files and all files have already been recorded in its
splitAssigner so it does not need the pathsAlreadyProcessed set.

For more detailed logic check the caller of the constructors of both
enumerators.

Krzysztof Chmielewski <kr...@gmail.com> 于2022年1月6日周四 07:04写道:

> Hi,
> Why StaticFileSplitEnumerator from FileSource does not track the already
> processed files similar to how ContinuousFileSplitEnumerator does?
>
> I'm thinking about scenario where we have a Bounded FileSource that reads
> a lot of files using FileSource and stream it's content to Kafka.If there
> will be a Job/cluster restart then we will process same files again.
>
> Regards,
> Krzysztof Chmielewski
>