You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Woessner, Leo" <le...@pearson.com> on 2019/06/02 15:22:56 UTC

Re: Limitations in StreamingFileSink BulkFormat

I have created a process *outside* of flink for this.  would be nice to use
Flink though.

It is important to us that the we only checkpoint after the records are
successfully saved in S3.
This is to insure all records are saved during node failure.
The process I wrote adds records to a file on disk, then when the size or
time is passed the file is written to S3.  Only then is the checkpoint
written.

Is this semantic possible in Flink?

On Fri, May 31, 2019 at 4:59 AM Timothy Victor <vi...@gmail.com> wrote:

> Not an expert, but I would think this will not be trivial since the reason
> for using checkpointing to trigger is to guarantee exactly once semantics
> in the event of a failure which in turn is tightly integrated into the CP
> mechanism.  The precursor the StreamingFileSink was BucketingFileSink which
> I believe did give some control over when to save, but it also suffered
> from duplicates in the file.   I vaguely recall reading a Flink blogpost on
> this, but cant recall right now.
>
> I sort of have the same desire, but I worked around it via periodically
> merging parquet files (doable as long as the schema is the same).  This is
> out of process of course.
>
> Tim
>
> On Fri, May 31, 2019, 4:36 AM Ayush Verma <ay...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using the StreamingFileSink BulkFormat in my Flink stream processing
>> job to output parquet files to S3. Now the
>> StreamingFileSink.BulkFormatBuilder
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_api_java_org_apache_flink_streaming_api_functions_sink_filesystem_StreamingFileSink.BulkFormatBuilder.html&d=DwMFaQ&c=0YLnzTkWOdJlub_y7qAx8Q&r=inSkT1on-tsjKCfwJeALX0MpTC7rlkm_JSZK71mr8Yw&m=CGxIhcGaPG7VIuzFbeDm-i39hYxKkF9fZtfNXMahNVY&s=tCBGeVYGvbjIf72DfXKVEX66lH4w39F-M2dcDdTdUCE&e=>,
>> does not have an option to set a custom rolling policy. It will roll the
>> files whenever the checkpoint triggers
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_api_java_org_apache_flink_streaming_api_functions_sink_filesystem_rollingpolicies_OnCheckpointRollingPolicy.html&d=DwMFaQ&c=0YLnzTkWOdJlub_y7qAx8Q&r=inSkT1on-tsjKCfwJeALX0MpTC7rlkm_JSZK71mr8Yw&m=CGxIhcGaPG7VIuzFbeDm-i39hYxKkF9fZtfNXMahNVY&s=Mgm0tyhPjzVAARv-2_vG2LTqEkMtRItxXZx1U01gfog&e=>.
>> It would be better to have a rolling policy based on both *size* and
>> *time*. One option is to write our own StreamingFileSink, which does
>> accept a custom rolling policy, but I suspect there might be some reason
>> for this behaviour.
>> I would like to get the opinion of Flink experts on this. And if there
>> are any potential workarounds to get the desired behaviour.
>>
>> Kind regards
>> Ayush Verma
>>
>

-- 
*Leo Woessner*
*Domain Engineering*
*Pearson Education*