You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bruno Aranda <ba...@apache.org> on 2019/03/29 15:21:57 UTC

StreamingFileSink seems to be overwriting existing part files

Hi,

One of the main reasons we moved to version 1.7 (and 1.7.2 in particular)
was because of the possibility of using a StreamingFileSink with S3.

We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
cluster in AWS.

We ran the job and after a few hours of activity, manually cancelled it
through the jobmanager API. After confirming that a number of "part-0-x"
files existed in S3 at the expected path, we then started the job again
using the same invocation of the CLI "flink run..." command that was
originally used to start it.

It started writing data to S3 again, starting afresh from "part-0-0", which
gradually overwrote the existing data.

I can understand not having used a checkpoint gives no indication on where
to resume, but the fact that it overwrites the existing files (as it starts
to write to part-0.0 again) is surprising. One would expect that it finds
the last part and gets the next free number?

We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the
presto version on the classpath.

Is this its expected behaviour? We have not seen this in the non streaming
versions of the sink.

Best regards,

Bruno

Re: StreamingFileSink seems to be overwriting existing part files

Posted by Kostas Kloudas <kk...@gmail.com>.
No problem!

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:38 PM Bruno Aranda <ba...@apache.org> wrote:

> Hi Kostas,
>
> Put that way, sounds fair enough. Many thanks for the clarification,
>
> Cheers,
>
> Bruno
>
> On Fri, 29 Mar 2019 at 15:32, Kostas Kloudas <kk...@gmail.com> wrote:
>
>> Hi Bruno,
>>
>> This is the expected behaviour as the job starts "fresh", given that you
>> did not specify any savepoint/checkpoint to start from.
>>
>> As for the note that "One would expect that it finds the last part and
>> gets the next free number?",
>> I am not sure how this can be achieved safely and efficiently in an
>> eventually consistent object store like s3.
>> This is actually the reason why, contrary to the BucketingSink, the
>> StreamingFileSink relies on Flink's own state to determine the "next" part
>> counter.
>>
>> Cheers,
>> Kostas
>>
>> On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <ba...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> One of the main reasons we moved to version 1.7 (and 1.7.2 in
>>> particular) was because of the possibility of using a StreamingFileSink
>>> with S3.
>>>
>>> We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
>>> bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
>>> cluster in AWS.
>>>
>>> We ran the job and after a few hours of activity, manually cancelled it
>>> through the jobmanager API. After confirming that a number of "part-0-x"
>>> files existed in S3 at the expected path, we then started the job again
>>> using the same invocation of the CLI "flink run..." command that was
>>> originally used to start it.
>>>
>>> It started writing data to S3 again, starting afresh from "part-0-0",
>>> which gradually overwrote the existing data.
>>>
>>> I can understand not having used a checkpoint gives no indication on
>>> where to resume, but the fact that it overwrites the existing files (as it
>>> starts to write to part-0.0 again) is surprising. One would expect that it
>>> finds the last part and gets the next free number?
>>>
>>> We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have
>>> the presto version on the classpath.
>>>
>>> Is this its expected behaviour? We have not seen this in the non
>>> streaming versions of the sink.
>>>
>>> Best regards,
>>>
>>> Bruno
>>>
>>

Re: StreamingFileSink seems to be overwriting existing part files

Posted by Bruno Aranda <ba...@apache.org>.
Hi Kostas,

Put that way, sounds fair enough. Many thanks for the clarification,

Cheers,

Bruno

On Fri, 29 Mar 2019 at 15:32, Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Bruno,
>
> This is the expected behaviour as the job starts "fresh", given that you
> did not specify any savepoint/checkpoint to start from.
>
> As for the note that "One would expect that it finds the last part and
> gets the next free number?",
> I am not sure how this can be achieved safely and efficiently in an
> eventually consistent object store like s3.
> This is actually the reason why, contrary to the BucketingSink, the
> StreamingFileSink relies on Flink's own state to determine the "next" part
> counter.
>
> Cheers,
> Kostas
>
> On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <ba...@apache.org> wrote:
>
>> Hi,
>>
>> One of the main reasons we moved to version 1.7 (and 1.7.2 in particular)
>> was because of the possibility of using a StreamingFileSink with S3.
>>
>> We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
>> bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
>> cluster in AWS.
>>
>> We ran the job and after a few hours of activity, manually cancelled it
>> through the jobmanager API. After confirming that a number of "part-0-x"
>> files existed in S3 at the expected path, we then started the job again
>> using the same invocation of the CLI "flink run..." command that was
>> originally used to start it.
>>
>> It started writing data to S3 again, starting afresh from "part-0-0",
>> which gradually overwrote the existing data.
>>
>> I can understand not having used a checkpoint gives no indication on
>> where to resume, but the fact that it overwrites the existing files (as it
>> starts to write to part-0.0 again) is surprising. One would expect that it
>> finds the last part and gets the next free number?
>>
>> We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have
>> the presto version on the classpath.
>>
>> Is this its expected behaviour? We have not seen this in the non
>> streaming versions of the sink.
>>
>> Best regards,
>>
>> Bruno
>>
>

Re: StreamingFileSink seems to be overwriting existing part files

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Bruno,

This is the expected behaviour as the job starts "fresh", given that you
did not specify any savepoint/checkpoint to start from.

As for the note that "One would expect that it finds the last part and gets
the next free number?",
I am not sure how this can be achieved safely and efficiently in an
eventually consistent object store like s3.
This is actually the reason why, contrary to the BucketingSink, the
StreamingFileSink relies on Flink's own state to determine the "next" part
counter.

Cheers,
Kostas

On Fri, Mar 29, 2019 at 4:22 PM Bruno Aranda <ba...@apache.org> wrote:

> Hi,
>
> One of the main reasons we moved to version 1.7 (and 1.7.2 in particular)
> was because of the possibility of using a StreamingFileSink with S3.
>
> We've configured a StreamingFileSink to use a DateTimeBucketAssigner to
> bucket by day. It's got a parallelism of 1 and is writing to S3 from an EMR
> cluster in AWS.
>
> We ran the job and after a few hours of activity, manually cancelled it
> through the jobmanager API. After confirming that a number of "part-0-x"
> files existed in S3 at the expected path, we then started the job again
> using the same invocation of the CLI "flink run..." command that was
> originally used to start it.
>
> It started writing data to S3 again, starting afresh from "part-0-0",
> which gradually overwrote the existing data.
>
> I can understand not having used a checkpoint gives no indication on where
> to resume, but the fact that it overwrites the existing files (as it starts
> to write to part-0.0 again) is surprising. One would expect that it finds
> the last part and gets the next free number?
>
> We're definitely using the flink-s3-fs-hadoop-1.7.2.jar and don't have the
> presto version on the classpath.
>
> Is this its expected behaviour? We have not seen this in the non streaming
> versions of the sink.
>
> Best regards,
>
> Bruno
>