You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by XilangYan <xi...@gmail.com> on 2018/07/02 02:22:55 UTC

Re: Let BucketingSink roll file on each checkpoint

Thank you Minglei,

I should describe my current flow and my requirement more clearly.
1. any data we collect have a send-time
2. when collect data, we also send another counter message, says we have
collect 45 message whose send-time is 2018-07-02 10:02:00
3. data is sent to kafka(or other message system), and flink receives data
from kafka and write to HDFS
4. when flink finished part of messages(neither .pending nor .inprogress,
when "finish" it must be finished state that can be read by other system),
we send another counter message, says we have processed 40 message whose
send-time is  2018-07-02 10:02:00

What i have did in flink is :
1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
2. I add another sink says CounterSink which counts message by send-time
2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,  I
close current files and move them   to pending state
3. in CounterSink.snapshotState I prepare to send the special counter
message
4. when checkpoint completed BucktingSink.notifyCheckpointComplete will move
pending files to finish state, CounterSink.notifyCheckpointComplete  will
send the special counter message

So in our counter-system, when the processed-message-counter is equal to the
received-message-counter, it meas ETL can continue their jobs.

The jira you submitted is not exactly what I want, however it will be great
if we can figure out a common solution to this requirement, although I think
it is difficult unless, as you said, we add some assumption like watermark.
On the other side, I think watermark may be able to archived by use the
combination of inactiveBucketThreashold and batchRolloverInterval already.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by zhangminglei <18...@163.com>.
Hi, Xilang

You can watch the jira what you referred to. I will work on this in the next couple of days.

Cheers
Minglei

> 在 2018年7月9日,上午9:50,XilangYan <xi...@gmail.com> 写道:
> 
> Hi Febian,
> 
> With watermark, I understand it could only write those that are smaller than
> the received watermark, but could I know why it would also need to maintain
> a write ahead log of all received rows? When an event received, it just
> compare time with current watermark,  write it to correct bucket if smaller
> then watermark, otherwise drop it.
> 
> With with assumption, BucketingSink could close all bucket that is older
> than current watermark, I think it  make sense as those bucket data won't
> change  anymore. The close action could be done in checkpoint callback or
> when every event receive.
> 
> It implemented the BucketReady mechanism @Minglei suggested in
> https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
> mechanism. And I think we don't need BucketWatermark concept, as it confuse
> with current watermark.
> 
> Thanks,
> Xilang
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Hi Febian,

With watermark, I understand it could only write those that are smaller than
the received watermark, but could I know why it would also need to maintain
a write ahead log of all received rows? When an event received, it just
compare time with current watermark,  write it to correct bucket if smaller
then watermark, otherwise drop it.

With with assumption, BucketingSink could close all bucket that is older
than current watermark, I think it  make sense as those bucket data won't
change  anymore. The close action could be done in checkpoint callback or
when every event receive.

It implemented the BucketReady mechanism @Minglei suggested in
https://issues.apache.org/jira/browse/FLINK-9609 using current watermark
mechanism. And I think we don't need BucketWatermark concept, as it confuse
with current watermark.

Thanks,
Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Xilang,

I thought about this again.
The bucketing sink would need to roll on event-time intervals (similar to
the current processing time rolling) which are triggered by watermarks in
order to support consistency.
However, it would also need to maintain a write ahead log of all received
rows and could only write those that are smaller than the received
watermark. This would make the whole sink more complex.

I'm not sure that rolling on checkpoint barriers is a good solution either.
IMO, the checkpointing interval and file rolling interval should not depend
on each other because it mixes different requirements and introduces
challenging trade-offs.

Best, Fabian


2018-07-04 11:59 GMT+02:00 XilangYan <xi...@gmail.com>:

> Hi Fabian,
>
> We did need a consistent view of data, we need the Counter and HDFS file to
> be consistent. For example, when the Counter indicate there is 1000 message
> wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
> read.
>
> The data we write to HDFS is collected by an Agent(which also send Counter
> message to count message number received), data has a timestamp and we use
> BucktingSink to write data into different bucket.
>
> Could you give me a clue on how to achieve this with watermark. As my
> understanding, watermark is designed to process out-of-order data with a
> know delay, how it can be used to make my CounterSink and BuckingSink
> consistent.
>
> Thanks, Xilang
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Let BucketingSink roll file on each checkpoint

Posted by XilangYan <xi...@gmail.com>.
Hi Fabian,

We did need a consistent view of data, we need the Counter and HDFS file to
be consistent. For example, when the Counter indicate there is 1000 message
wrote to the HDFS, there must be exactly 1000 messages in HDFS ready for
read.

The data we write to HDFS is collected by an Agent(which also send Counter
message to count message number received), data has a timestamp and we use
BucktingSink to write data into different bucket.

Could you give me a clue on how to achieve this with watermark. As my
understanding, watermark is designed to process out-of-order data with a
know delay, how it can be used to make my CounterSink and BuckingSink
consistent. 

Thanks, Xilang



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Let BucketingSink roll file on each checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Xilang,

Let me try to summarize your requirements.
If I understood you correctly, you are not only concerned about the
exactly-once guarantees but also need a consistent view of the data.
The data in all files that are finalized need to originate from a prefix of
the stream, i.e., all records written to the finalized files must have a
timestamp smaller or equal than T and there might not be a record with a
timestamp larger than T.

I think this can be achieved with the current implementation and event-time
processing. If the result of a job is emitted by a timer which is triggered
by watermarks, you will have the prefix property (even taking out-of-order
records into account!).
The reason is that watermarks and checkpoint barriers are shipped like
regular data records. By using timers, all computations are "synchonized"
by the watermarks which cannot be overtaken by checkpoint barriers.

Best, Fabian

2018-07-02 4:22 GMT+02:00 XilangYan <xi...@gmail.com>:

> Thank you Minglei,
>
> I should describe my current flow and my requirement more clearly.
> 1. any data we collect have a send-time
> 2. when collect data, we also send another counter message, says we have
> collect 45 message whose send-time is 2018-07-02 10:02:00
> 3. data is sent to kafka(or other message system), and flink receives data
> from kafka and write to HDFS
> 4. when flink finished part of messages(neither .pending nor .inprogress,
> when "finish" it must be finished state that can be read by other system),
> we send another counter message, says we have processed 40 message whose
> send-time is  2018-07-02 10:02:00
>
> What i have did in flink is :
> 1. I add a config to BucktingSink, the config name is rolloverOnCheckpoint
> 2. I add another sink says CounterSink which counts message by send-time
> 2. in BucktingSink.snapshotState, if rolloverOnCheckpoint is set to true,
> I
> close current files and move them   to pending state
> 3. in CounterSink.snapshotState I prepare to send the special counter
> message
> 4. when checkpoint completed BucktingSink.notifyCheckpointComplete will
> move
> pending files to finish state, CounterSink.notifyCheckpointComplete  will
> send the special counter message
>
> So in our counter-system, when the processed-message-counter is equal to
> the
> received-message-counter, it meas ETL can continue their jobs.
>
> The jira you submitted is not exactly what I want, however it will be great
> if we can figure out a common solution to this requirement, although I
> think
> it is difficult unless, as you said, we add some assumption like watermark.
> On the other side, I think watermark may be able to archived by use the
> combination of inactiveBucketThreashold and batchRolloverInterval already.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>