You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Anton Parkhomenko <ma...@chuwy.me> on 2019/09/09 10:40:38 UTC

StreamingFileSink rolling callback Inbox

Hello,

I’m writing a Flink job that reads heterogenius (one row contains several
types that need to be partitioned downstream) data from AWS Kinesis and
writes to S3 directory structure like s3://bucket/year/month/day/hour/type,
this all works great with StreamingFileSink in Flink 1.9, but problem is
that I need to immedietely (or “as soon as possible” rather) let know
another application to know when “hour” bucket has rolled (i.e. we’re 100%
sure it won’t write any more data for this hour). Another problem is that
data can be very skewed in types, e.g. one hour can contain 90% of rows
with typeA, 30% of rows with typeB and 1% of rows with typeC.

My current plan is to:

1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t
care about event time at all)
2. Assign every row its bucket in a windowing function
3. Write a stateful BucketAssigner that:
3.1. Keeps its last window in a mutable variable
3.2. Once we received a row with newer window sends a message to SQS and
increments the window

My biggest concern now is about 3rd point. For me BucketAssigner looks like
a pure function of (Row, Time) -> Bucket and I’m not sure that introducing
state and side-effect there would be reasonable. Is there any other ways to
do it? I’m also thinking on how I should couple this with checkpointing
mechanism as ideally I’d like to not invoke this callback before checkpoint
is written.

StreamingFileSink provides not much ways to extend it. I tried to
re-implement it for my purposes, but stumbled upon many private methods and
classes, so even though it looks possible, the end result probably will be
too ugly.

To make things a little bit easier, I don’t care too much about delivery
semantics of those final SQS messages - if I get only ~99% of them - that’s
fine, if some of them will be duplicated - that’s also fine.

Regards,
Anton

Re: StreamingFileSink rolling callback Inbox

Posted by Kostas Kloudas <kk...@apache.org>.
Hi Anton,

First of all, there is this PR
https://github.com/apache/flink/pull/9581 that may be interesting to
you.

Second, I think you have to keep in mind that the hourly bucket
reporting will be per-subtask. So if you have parallelism of 4, each
of the 4 tasks will report individually that they are done with hour
e.g. 10, and it is up to the receiving end to know if it should wait
for more or not. This may be a problem for your stateful assigner
approach as the assigner cannot know by default which subtask it
belongs to. If, for example, you have parallelism of 1, then your
stateful assigner approach could work, although it suffers from the
problem you also mentioned, that it is not integrated with
checkpointing (so a part file may be "reverted") and that a file may
roll, but it does not mean that the previous is already written to the
FS.

Third, a solution could be that instead of having the job itself
pushing notifications that a part file has rolled (which may suffer
from the problem that a part file may roll but the FS takes some time
until it writes everything to disk), you could simply monitor the FS
directory where you are writing your buckets, and parse the part file
names in order to know that all subtasks have finished with hour X.
This can be done by another job which will also put notifications to
the SQS. I think that this will also solve your concern: "I’m also
thinking on how I should couple this with checkpointing mechanism as
ideally I’d like to not invoke this callback before checkpoint is
written."

Cheers,
Kostas

On Mon, Sep 9, 2019 at 12:40 PM Anton Parkhomenko <ma...@chuwy.me> wrote:
>
> Hello,
>
> I’m writing a Flink job that reads heterogenius (one row contains several types that need to be partitioned downstream) data from AWS Kinesis and writes to S3 directory structure like s3://bucket/year/month/day/hour/type, this all works great with StreamingFileSink in Flink 1.9, but problem is that I need to immedietely (or “as soon as possible” rather) let know another application to know when “hour” bucket has rolled (i.e. we’re 100% sure it won’t write any more data for this hour). Another problem is that data can be very skewed in types, e.g. one hour can contain 90% of rows with typeA, 30% of rows with typeB and 1% of rows with typeC.
>
> My current plan is to:
>
> 1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t care about event time at all)
> 2. Assign every row its bucket in a windowing function
> 3. Write a stateful BucketAssigner that:
> 3.1. Keeps its last window in a mutable variable
> 3.2. Once we received a row with newer window sends a message to SQS and increments the window
>
> My biggest concern now is about 3rd point. For me BucketAssigner looks like a pure function of (Row, Time) -> Bucket and I’m not sure that introducing state and side-effect there would be reasonable. Is there any other ways to do it? I’m also thinking on how I should couple this with checkpointing mechanism as ideally I’d like to not invoke this callback before checkpoint is written.
>
> StreamingFileSink provides not much ways to extend it. I tried to re-implement it for my purposes, but stumbled upon many private methods and classes, so even though it looks possible, the end result probably will be too ugly.
>
> To make things a little bit easier, I don’t care too much about delivery semantics of those final SQS messages - if I get only ~99% of them - that’s fine, if some of them will be duplicated - that’s also fine.
>
> Regards,
> Anton