You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by XilangYan <xi...@gmail.com> on 2018/03/20 01:11:23 UTC

Let BucketingSink roll file on each checkpoint

The behavior of BucketingSink is not exactly we want. 
If I understood correctly, when checkpoint requested, BucketingSink will
flush writer to make sure data not loss, but will not close file, nor roll
new file after checkpoint.
In the case of HDFS, if file length is not updated to name node(through
close file or update file length specifically), MR or other data analysis
tool will not read new data. This is not we desired.
I also want to open new file for each checkpoint period to make sure HDFS
file is persistent, because we met some bugs in flush/append hdfs file user
case.

Is there anyway to let BucketingSink roll file on each checkpoint? Thanks in
advance.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by zhangminglei <18...@163.com>.
Hi, Xilang

You can watch the jira what you referred to. I will work on this in the next couple of days.

Cheers
Minglei

> 在 2018年7月9日,上午9:50,XilangYan <xi...@gmail.com> 写道:
> 
> Hi Febian,
> 
> With watermark, I understand it could only write those that are smaller than
> the received watermark, but could I know why it would also need to maintain
> a write ahead log of all received rows? When an event received, it just
> compare time with current watermark,  write it to correct bucket if smaller
> then watermark, otherwise drop it.
> 
> With with assumption, BucketingSink could close all bucket that is older
> than current watermark, I think it  make sense as those bucket data won't
> change  anymore. The close action could be done in checkpoint callback or
> when every event receive.
> 
> It implemented the BucketReady mechanism @Minglei suggested in
> https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
> mechanism. And I think we don't need BucketWatermark concept, as it confuse
> with current watermark.
> 
> Thanks,
> Xilang
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Hi Febian,

With watermark, I understand it could only write those that are smaller than
the received watermark, but could I know why it would also need to maintain
a write ahead log of all received rows? When an event received, it just
compare time with current watermark,  write it to correct bucket if smaller
then watermark, otherwise drop it.

With with assumption, BucketingSink could close all bucket that is older
than current watermark, I think it  make sense as those bucket data won't
change  anymore. The close action could be done in checkpoint callback or
when every event receive.

It implemented the BucketReady mechanism @Minglei suggested in
https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
mechanism. And I think we don't need BucketWatermark concept, as it confuse
with current watermark.

Thanks,
Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Xilang,

I thought about this again.
The bucketing sink would need to roll on event-time intervals (similar to
the current processing time rolling) which are triggered by watermarks in
order to support consistency.
However, it would also need to maintain a write ahead log of all received
rows and could only write those that are smaller than the received
watermark. This would make the whole sink more complex.

I'm not sure that rolling on checkpoint barriers is a good solution either.
IMO, the checkpointing interval and file rolling interval should not depend
on each other because it mixes different requirements and introduces
challenging trade-offs.

Best, Fabian


2018-07-04 11:59 GMT+02:00 XilangYan <xi...@gmail.com>:

> Hi Fabian,
>
> We did need a consistent view of data, we need the Counter and HDFS file to
> be consistent. For example, when the Counter indicate there is 1000 message
> wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
> read.
>
> The data we write to HDFS is collected by an Agent(which also send Counter
> message to count message number received), data has a timestamp and we use
> BucktingSink to write data into different bucket.
>
> Could you give me a clue on how to achieve this with watermark. As my
> understanding, watermark is designed to process out-of-order data with a
> know delay, how it can be used to make my CounterSink and BuckingSink
> consistent.
>
> Thanks, Xilang
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Hi Fabian,

We did need a consistent view of data, we need the Counter and HDFS file to
be consistent. For example, when the Counter indicate there is 1000 message
wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
read.

The data we write to HDFS is collected by an Agent(which also send Counter
message to count message number received), data has a timestamp and we use
BucktingSink to write data into different bucket.

Could you give me a clue on how to achieve this with watermark. As my
understanding, watermark is designed to process out-of-order data with a
know delay, how it can be used to make my CounterSink and BuckingSink
consistent. 

Thanks, Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Xilang,

Let me try to summarize your requirements.
If I understood you correctly, you are not only concerned about the
exactly-once guarantees but also need a consistent view of the data.
The data in all files that are finalized need to originate from a prefix of
the stream, i.e., all records written to the finalized files must have a
timestamp smaller or equal than T and there might not be a record with a
timestamp larger than T.

I think this can be achieved with the current implementation and event-time
processing. If the result of a job is emitted by a timer which is triggered
by watermarks, you will have the prefix property (even taking out-of-order
records into account!).
The reason is that watermarks and checkpoint barriers are shipped like
regular data records. By using timers, all computations are "synchonized"
by the watermarks which cannot be overtaken by checkpoint barriers.

Best, Fabian

2018-07-02 4:22 GMT+02:00 XilangYan <xi...@gmail.com>:

> Thank you Minglei,
>
> I should describe my current flow and my requirement more clearly.
> 1. any data we collect have a send-time
> 2. when collect data, we also send another counter message, says we have
> collect 45 message whose send-time is 2018-07-02 10:02:00
> 3. data is sent to kafka(or other message system), and flink receives data
> from kafka and write to HDFS
> 4. when flink finished part of messages(neither .pending nor .inprogress,
> when "finish" it must be finished state that can be read by other system),
> we send another counter message, says we have processed 40 message whose
> send-time is  2018-07-02 10:02:00
>
> What i have did in flink is :
> 1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
> 2. I add another sink says CounterSink which counts message by send-time
> 2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,
> I
> close current files and move them   to pending state
> 3. in CounterSink.snapshotState I prepare to send the special counter
> message
> 4. when checkpoint completed BucktingSink.notifyCheckpointComplete will
> move
> pending files to finish state, CounterSink.notifyCheckpointComplete  will
> send the special counter message
>
> So in our counter-system, when the processed-message-counter is equal to
> the
> received-message-counter, it meas ETL can continue their jobs.
>
> The jira you submitted is not exactly what I want, however it will be great
> if we can figure out a common solution to this requirement, although I
> think
> it is difficult unless, as you said, we add some assumption like watermark.
> On the other side, I think watermark may be able to archived by use the
> combination of inactiveBucketThreashold and batchRolloverInterval already.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Thank you Minglei,

I should describe my current flow and my requirement more clearly.
1. any data we collect have a send-time
2. when collect data, we also send another counter message, says we have
collect 45 message whose send-time is 2018-07-02 10:02:00
3. data is sent to kafka(or other message system), and flink receives data
from kafka and write to HDFS
4. when flink finished part of messages(neither .pending nor .inprogress,
when "finish" it must be finished state that can be read by other system),
we send another counter message, says we have processed 40 message whose
send-time is  2018-07-02 10:02:00

What i have did in flink is :
1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
2. I add another sink says CounterSink which counts message by send-time
2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,  I
close current files and move them   to pending state
3. in CounterSink.snapshotState I prepare to send the special counter
message
4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move
pending files to finish state, CounterSink.notifyCheckpointComplete  will
send the special counter message

So in our counter-system, when the processed-message-counter is equal to the
received-message-counter, it meas ETL can continue their jobs.

The jira you submitted is not exactly what I want, however it will be great
if we can figure out a common solution to this requirement, although I think
it is difficult unless, as you said, we add some assumption like watermark.
On the other side, I think watermark may be able to archived by use the
combination of inactiveBucketThreashold and batchRolloverInterval already.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by zhangminglei <18...@163.com>.
By the way, I do not think below is a correct way. As @ Fabian said. The BucketingSink closes files once they reached a certain size (BatchSize) or have not been written to for a certain amount of time (InactiveBucketThreshold). 

> . If we can close
> file during checkpoint, then the result is accurately. 

And please take a look on BucketingSink code. Says, there are closed files that we are not currently writing to ….. But which were not yet confirmed by a checkpoint.

/**
 * The suffix for {@code pending} part files. These are closed files that we are
 * not currently writing to (inactive or reached {@link #batchSize}), but which
 * were not yet confirmed by a checkpoint.
 */
private static final String DEFAULT_PENDING_SUFFIX = ".pending";
After checkpoint, the file name neither .pending nor .inprogress. So ,you can check your files name under every bucket to let the ETL team know when a bucket is ready for use.

Cheers
Minglei



> 在 2018年6月29日,上午9:03,XilangYan <xi...@gmail.com> 写道:
> 
> Hi Febian,
> 
> Finally I have time to read the code, and it is brilliant it does provide
> exactly once guarantee。
> However I still suggest to add the function that can close a file when
> checkpoint made. I noticed that there is an enhancement
> https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
> time based rollover, but it is not very accurate.
> My user case is we read data from message queue, write to HDFS, and our ETL
> team will use the data in HDFS. In the case, ETL need to know if all data is
> ready to be read accurately, so we use a counter to count how many data has
> been wrote, if the counter is equal to the number we received, we think HDFS
> file is ready. We send the counter message in a custom sink so ETL can know
> how many data has been wrote, but if use current BucketingSink, even through
> HDFS file is flushed, ETL may still cannot read the data. If we can close
> file during checkpoint, then the result is accurately. And for the HDFS
> small file problem, it can be controller by use bigger checkpoint interval.
> 
> I did take the BuckingSink code and adapt our case, but if it can be done in
> Flink, we can save to time to maintain our own branch.
> 
> Thanks!
> Jeffrey
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Let BucketingSink roll file on each checkpoint

Posted by zhangminglei <18...@163.com>.
Hi Xilang

I think you are doing a together work with the offline team. Also what you said ETL, ETL team want to use the data in HDFS. I would like to confirm one question from you. What is their scheduling time for every job ? 5mins or 10 mins ? 

> My user case is we read data from message queue, write to HDFS, and our ETL
> team will use the data in HDFS. ETL need to know if all data is
> ready to be read accurately

I think you want to find a functionality that let the ETL team know when a bucket is ready for them to use. Correct ? If yes, please take a look on this jira : https://issues.apache.org/jira/browse/FLINK-9609 <https://issues.apache.org/jira/browse/FLINK-9609>

Cheers
Minglei


> 在 2018年6月29日,上午9:03,XilangYan <xi...@gmail.com> 写道:
> 
> Hi Febian,
> 
> Finally I have time to read the code, and it is brilliant it does provide
> exactly once guarantee。
> However I still suggest to add the function that can close a file when
> checkpoint made. I noticed that there is an enhancement
> https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
> time based rollover, but it is not very accurate.
> My user case is we read data from message queue, write to HDFS, and our ETL
> team will use the data in HDFS. In the case, ETL need to know if all data is
> ready to be read accurately, so we use a counter to count how many data has
> been wrote, if the counter is equal to the number we received, we think HDFS
> file is ready. We send the counter message in a custom sink so ETL can know
> how many data has been wrote, but if use current BucketingSink, even through
> HDFS file is flushed, ETL may still cannot read the data. If we can close
> file during checkpoint, then the result is accurately. And for the HDFS
> small file problem, it can be controller by use bigger checkpoint interval.
> 
> I did take the BuckingSink code and adapt our case, but if it can be done in
> Flink, we can save to time to maintain our own branch.
> 
> Thanks!
> Jeffrey
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Hi Febian,

Finally I have time to read the code, and it is brilliant it does provide
exactly once guarantee。
However I still suggest to add the function that can close a file when
checkpoint made. I noticed that there is an enhancement
https://issues.apache.org/jira/browse/FLINK-9138 which can close file on a
time based rollover, but it is not very accurate.
My user case is we read data from message queue, write to HDFS, and our ETL
team will use the data in HDFS. In the case, ETL need to know if all data is
ready to be read accurately, so we use a counter to count how many data has
been wrote, if the counter is equal to the number we received, we think HDFS
file is ready. We send the counter message in a custom sink so ETL can know
how many data has been wrote, but if use current BucketingSink, even through
HDFS file is flushed, ETL may still cannot read the data. If we can close
file during checkpoint, then the result is accurately. And for the HDFS
small file problem, it can be controller by use bigger checkpoint interval.

I did take the BuckingSink code and adapt our case, but if it can be done in
Flink, we can save to time to maintain our own branch.

Thanks!
Jeffrey




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Ok, then may be I have misunderstanding about checkpoint. 
I thought flink use checkpoint to store offset, but when kafka connector
making a checkpoint, it doesn't know whether data is in in-progress file or
in pending-file so a whole offset is saved in checkpoint. I used to guess,
the data in in-progress file may be lost when checkpoint requested. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Flink maintains its own Kafka offsets in its checkpoints and does not rely
on Kafka's offset management.
That way Flink guarantees that read offsets and checkpointed operator state
are always aligned.
The BucketingSink is designed to not lose any data and the mode of
operation is described in detail in JavaDocs of the class [1].

Please check the JavaDocs and let me know if you have questions or doubts
about the mechanism.

Best, Fabian

[1]
https://github.com/apache/flink/blob/release-1.4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java

2018-03-21 7:38 GMT+01:00 XilangYan <xi...@gmail.com>:

> Thank you! Fabian
>
> HDFS small file problem can be avoid with big checkpoint interval.
>
> Meanwhile, there is potential data lose problem in current BucketingSink.
> Say we consume data in kafka, when checkpoint is requested, kafka offset is
> update, but in-progress file in BucketingSink is remained. If flink crushed
> after that, data in the in-progress file is lost. Am I right?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Thank you! Fabian

HDFS small file problem can be avoid with big checkpoint interval.

Meanwhile, there is potential data lose problem in current BucketingSink.
Say we consume data in kafka, when checkpoint is requested, kafka offset is
update, but in-progress file in BucketingSink is remained. If flink crushed
after that, data in the in-progress file is lost. Am I right?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/