You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Averell <lv...@gmail.com> on 2018/08/08 05:17:45 UTC

Re: Small-files source - partitioning based on prefix of file

Thank you Fabian.
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./" 
I do not quite understand this statement. If I have read 30 lines from the
checkpoint and sent those 30 records to the next operator, then when the
streaming is recovered - resumed from the last checkpoint, the subsequent
operator would receive those 30 lines again, am I right?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.
It is clear to me now. Thanks a lot for your help.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Thank you Fabian.

Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

CMCF is not a source, only the file monitoring function is. Barriers are
injected by the FMF when the JM sends a checkpoint message. The barriers
then travel to the CMCF and trigger the Checkpoint ING.

Fabian




Averell <lv...@gmail.com> schrieb am Di., 28. Aug. 2018, 12:02:

> Hello Fabian,
>
> Thanks for the answer. However, my question is a little bit different.
> Let me rephrase my example and my question:
>      * I have 10,000 unsplittable small files to read, which, in total, has
> about 10M output lines.
>      * From Flink's reporting web GUI, I can see that CFMF and
> ContinuousFileReaderOperator (CFRO) are reported separately.
>             - CFMF needs about 10 seconds to generate all 10,000 records
> (as
> you said, in this case, 1 record = 1 file split).
>             - CFRO generates about 2M records per minute (which means CFRO
> is processing at the rate of 2,000 files per minute)
>      * I set the checkpointing interval = 1 minute.
> In this example, /will the 1st barrier be injected into the stream of
> file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/
>
> Sorry for being confusing.
>
> Thanks and best regards,
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Hello Fabian,

Thanks for the answer. However, my question is a little bit different.
Let me rephrase my example and my question:
     * I have 10,000 unsplittable small files to read, which, in total, has
about 10M output lines.
     * From Flink's reporting web GUI, I can see that CFMF and
ContinuousFileReaderOperator (CFRO) are reported separately.
            - CFMF needs about 10 seconds to generate all 10,000 records (as
you said, in this case, 1 record = 1 file split).
            - CFRO generates about 2M records per minute (which means CFRO
is processing at the rate of 2,000 files per minute)
     * I set the checkpointing interval = 1 minute.
In this example, /will the 1st barrier be injected into the stream of
file-splits 50 seconds after the 10,000th split, or after the 2,000th one?/

Sorry for being confusing.

Thanks and best regards,
Averell




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

Barriers are injected into the regular data flow by source functions.
In case of a file monitoring source, the barriers are injected into the
stream of file splits that are passed to the
ContinuousFileMonitoringFunction.
The CFMF puts the splits into a queue and processes them with a dedicated
split reader thread. All state modifying operations of the thread (emitting
a record, opening a new split, etc.) are guarded by a checkpoint lock.
When the CFMF receives a barrier, the checkpointing logic requests the lock
and forces the split reader thread to pause. Then it requests the current
state of the thread and writes it into its checkpoint.

In order to be able to properly checkpoint the state of the reading thread
within a split, the InputFormat that is used to read the files must
implement the CheckpointableInputFormat interface. Otherwise, a split will
be read from the start.

Best, Fabian

Am Mo., 27. Aug. 2018 um 10:55 Uhr schrieb Averell <lv...@gmail.com>:

> Hello Fabian, and all,
>
> Please excuse me for digging this old thread up.
> I have a question regarding sending of the "barrier" messages in Flink's
> checkpointing mechanism: I want to know when those barrier messages are
> sent
> when I am using a file source. Where can I find it in the source code?
>
> I'm still with my 20,000 small files issue, when I have all those 20000
> files appear to the ContinuousFileMonitorfingFunction at the same time.
> It is taking only a few seconds to list all those files, but it is expected
> to take about 5 minutes have those 20K files processed till my sink.
> Due to some resources limitation issue, my job fails after about 3 minutes.
> And what is happening after that is the job crashes, gets restored, tries
> to
> process all 20K files from file 1 again, and ultimately fails again after 3
> minutes,... It goes into an indefinite loop.
>
> I think that this is the expected behaviour, as my current checkpoint
> config
> is to checkpoint every 10s, and it took only a second or two for the
> listing
> of those 20K files. Am I correct here? And do we have a solution for this?
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Hello Fabian, and all,

Please excuse me for digging this old thread up.
I have a question regarding sending of the "barrier" messages in Flink's
checkpointing mechanism: I want to know when those barrier messages are sent
when I am using a file source. Where can I find it in the source code?

I'm still with my 20,000 small files issue, when I have all those 20000
files appear to the ContinuousFileMonitorfingFunction at the same time.
It is taking only a few seconds to list all those files, but it is expected
to take about 5 minutes have those 20K files processed till my sink.
Due to some resources limitation issue, my job fails after about 3 minutes.
And what is happening after that is the job crashes, gets restored, tries to
process all 20K files from file 1 again, and ultimately fails again after 3
minutes,... It goes into an indefinite loop.

I think that this is the expected behaviour, as my current checkpoint config
is to checkpoint every 10s, and it took only a second or two for the listing
of those 20K files. Am I correct here? And do we have a solution for this?

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

Conceptually, you are right. Checkpoints are taken at every operator at the
same "logical" time.
It is not important, that each operator checkpoints at the same wallclock
time. Instead, the need to take a checkpoint when they have processed the
same input.
This is implemented with so-called Checkpoint Barriers, which are special
records that are injected at the sources.
[Simplification] When an operator receives a barrier it performs a
checkpoint. [/Simplification]
This way, we do not need to pause the processing of all operators but can
perform the checkpoints locally for each operator.

This page of the Internal docs should help to understand how the mechanism
works in detail [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/stream_checkpointing.html

2018-08-10 14:43 GMT+02:00 Averell <lv...@gmail.com>:

> Thank you Vino, Jorn, and Fabian.
> Please forgive me for my ignorant, as I am still not able to fully
> understand state/checkpointing and the statement that Fabian gave earlier:
> "/In either case, some record will be read twice but if reading position
> can
> be reset, you can still have exactly-once state consistency because the
> state is reset as well./"
>
> My current understanding is: checkpointing is managed at the
> Execution-Environment level, and it would happen at the same time at all
> the
> operators of the pipeline. Is this true?
> My concern here is how to manage that synchronization? It would be quite
> possible that at different operators, checkpointing happens at some
> milliseconds apart, which would lead to duplicated or missed records,
> wouldn't it?
>
> I tried to read Flink's document about managing State  here
> <https://ci.apache.org/projects/flink/flink-docs-
> stable/dev/stream/state/state.html>
> . However, I have not been able to find the information I am looking for.
> Please help point me to the right place.
>
> Thanks and best regards,
> Averell.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Thank you Vino, Jorn, and Fabian.
Please forgive me for my ignorant, as I am still not able to fully
understand state/checkpointing and the statement that Fabian gave earlier:
"/In either case, some record will be read twice but if reading position can
be reset, you can still have exactly-once state consistency because the
state is reset as well./"

My current understanding is: checkpointing is managed at the
Execution-Environment level, and it would happen at the same time at all the
operators of the pipeline. Is this true?
My concern here is how to manage that synchronization? It would be quite
possible that at different operators, checkpointing happens at some
milliseconds apart, which would lead to duplicated or missed records,
wouldn't it?

I tried to read Flink's document about managing State  here
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html> 
. However, I have not been able to find the information I am looking for.
Please help point me to the right place.

Thanks and best regards,
Averell.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averell,

One comment regarding what you said:

> As my files are small, I think there would not be much benefit in
checkpointing file offset state.

Checkpointing is not about efficiency but about consistency.
If the position in a split is not checkpointed, your application won't
operate with exactly-once state consistency unless each split produces
exactly one record.

Best, Fabian

2018-08-10 9:10 GMT+02:00 Jörn Franke <jo...@gmail.com>:

> Or you write a custom file system for Flink... (for  the tar part).
> Unfortunately gz files can only be processed single threaded (there are
> some multiple thread implementation but they don’t bring the big gain).
>
> On 10. Aug 2018, at 07:07, vino yang <ya...@gmail.com> wrote:
>
> Hi Averell,
>
> In this case, I think you may need to extend Flink's existing source.
> First, read your tar.gz large file, when it been decompressed, use the
> multi-threaded ability to read the record in the source, and then parse the
> data format (map / flatmap  might be a suitable operator, you can chain
> them with source because these two operator don't require data shuffle).
>
> Note that Flink doesn't encourage creating extra threads in UDFs, but I
> don't know if there is a better way for this scenario.
>
> Thanks, vino.
>
> Averell <lv...@gmail.com> 于2018年8月10日周五 下午12:05写道:
>
>> Hi Fabian, Vino,
>>
>> I have one more question, which I initially planned to create a new
>> thread,
>> but now I think it is better to ask here:
>> I need to process one big tar.gz file which contains multiple small gz
>> files. What is the best way to do this? I am thinking of having one single
>> thread process that read the TarArchiveStream (which has been decompressed
>> from that tar.gz by Flink automatically), and then distribute the
>> TarArchiveEntry entries to a multi-thread operator which would process the
>> small files in parallel. If this is feasible, which elements from Flink I
>> can reuse?
>>
>> Thanks a lot.
>> Regards,
>> Averell
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>

Re: Small-files source - partitioning based on prefix of file

Posted by Jörn Franke <jo...@gmail.com>.
Or you write a custom file system for Flink... (for  the tar part).
Unfortunately gz files can only be processed single threaded (there are some multiple thread implementation but they don’t bring the big gain). 

> On 10. Aug 2018, at 07:07, vino yang <ya...@gmail.com> wrote:
> 
> Hi Averell,
> 
> In this case, I think you may need to extend Flink's existing source. 
> First, read your tar.gz large file, when it been decompressed, use the multi-threaded ability to read the record in the source, and then parse the data format (map / flatmap  might be a suitable operator, you can chain them with source because these two operator don't require data shuffle).
> 
> Note that Flink doesn't encourage creating extra threads in UDFs, but I don't know if there is a better way for this scenario.
> 
> Thanks, vino.
> 
> Averell <lv...@gmail.com> 于2018年8月10日周五 下午12:05写道:
>> Hi Fabian, Vino,
>> 
>> I have one more question, which I initially planned to create a new thread,
>> but now I think it is better to ask here:
>> I need to process one big tar.gz file which contains multiple small gz
>> files. What is the best way to do this? I am thinking of having one single
>> thread process that read the TarArchiveStream (which has been decompressed
>> from that tar.gz by Flink automatically), and then distribute the
>> TarArchiveEntry entries to a multi-thread operator which would process the
>> small files in parallel. If this is feasible, which elements from Flink I
>> can reuse?
>> 
>> Thanks a lot.
>> Regards,
>> Averell
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

In this case, I think you may need to extend Flink's existing source.
First, read your tar.gz large file, when it been decompressed, use the
multi-threaded ability to read the record in the source, and then parse the
data format (map / flatmap  might be a suitable operator, you can chain
them with source because these two operator don't require data shuffle).

Note that Flink doesn't encourage creating extra threads in UDFs, but I
don't know if there is a better way for this scenario.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月10日周五 下午12:05写道:

> Hi Fabian, Vino,
>
> I have one more question, which I initially planned to create a new thread,
> but now I think it is better to ask here:
> I need to process one big tar.gz file which contains multiple small gz
> files. What is the best way to do this? I am thinking of having one single
> thread process that read the TarArchiveStream (which has been decompressed
> from that tar.gz by Flink automatically), and then distribute the
> TarArchiveEntry entries to a multi-thread operator which would process the
> small files in parallel. If this is feasible, which elements from Flink I
> can reuse?
>
> Thanks a lot.
> Regards,
> Averell
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Hi Fabian, Vino,

I have one more question, which I initially planned to create a new thread,
but now I think it is better to ask here:
I need to process one big tar.gz file which contains multiple small gz
files. What is the best way to do this? I am thinking of having one single
thread process that read the TarArchiveStream (which has been decompressed
from that tar.gz by Flink automatically), and then distribute the
TarArchiveEntry entries to a multi-thread operator which would process the
small files in parallel. If this is feasible, which elements from Flink I
can reuse?

Thanks a lot.
Regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Averell <lv...@gmail.com>.
Thank you Vino and Fabien for your help in answering my questions. As my
files are small, I think there would not be much benefit in checkpointing
file offset state.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Small-files source - partitioning based on prefix of file

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Averall,

As Vino said, checkpoints store the state of all operators of an
application.
The state of a monitoring source function is the position in the currently
read split and all splits that have been received and are currently pending.

In case of a recovery, the splits are recovered and the source is reset to
the split that was read when the checkpoint was taken and set to the
correct reading position.
Once, that is done, records that have been read before are read again.
However, that does not affect the exactly-once guarantees of the operators
state because all of them have been reset to the same position.

Best, Fabian

2018-08-08 9:26 GMT+02:00 vino yang <ya...@gmail.com>:

> Hi Averell,
>
> You need to understand that Flink reflects the recovery of the state, not
> the recovery of the record.
> Of course, sometimes your record is state, but sometimes the intermediate
> result of your record is the state.
> It depends on your business logic and your operators.
>
> Thanks, vino.
>
> Averell <lv...@gmail.com> 于2018年8月8日周三 下午1:17写道:
>
>> Thank you Fabian.
>> "/In either case, some record will be read twice but if reading position
>> can
>> be reset, you can still have exactly-once state consistency because the
>> state is reset as well./"
>> I do not quite understand this statement. If I have read 30 lines from the
>> checkpoint and sent those 30 records to the next operator, then when the
>> streaming is recovered - resumed from the last checkpoint, the subsequent
>> operator would receive those 30 lines again, am I right?
>>
>> Thanks!
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>

Re: Small-files source - partitioning based on prefix of file

Posted by vino yang <ya...@gmail.com>.
Hi Averell,

You need to understand that Flink reflects the recovery of the state, not
the recovery of the record.
Of course, sometimes your record is state, but sometimes the intermediate
result of your record is the state.
It depends on your business logic and your operators.

Thanks, vino.

Averell <lv...@gmail.com> 于2018年8月8日周三 下午1:17写道:

> Thank you Fabian.
> "/In either case, some record will be read twice but if reading position
> can
> be reset, you can still have exactly-once state consistency because the
> state is reset as well./"
> I do not quite understand this statement. If I have read 30 lines from the
> checkpoint and sent those 30 records to the next operator, then when the
> streaming is recovered - resumed from the last checkpoint, the subsequent
> operator would receive those 30 lines again, am I right?
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>