You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Enrico Agnoli <en...@workday.com> on 2019/10/15 13:47:05 UTC

performances of S3 writing with many buckets in parallel

Starting here the discussion after an initial discussion with Ververica and AWS teams during FlinkForward.
I'm investigating the performances of a Flink job that transports data from Kafka to an S3 Sink.
We are using a BucketingSink to write parquet files. The bucketing logic divides the messages having a folder per type of data, tenant (customer), date-time, extraction Id, etc etc. This results in each file is stored in a folder structure composed by 9-10 layers (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...)

If the data is distributed as bursts of messages for tenant-type we see good performances in writing, but when the data is more a white noise distribution on thousands of tenants, dozens of data types and multiple extraction IDs, we have an incredible loss of performances. (in the order of 300x times)

Attaching a debugger, it seems the issue is connected to the number of handlers open at the same time on S3 to write data. More specifically 
https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png

Researching in the hadoop libraries used to write to S3 I have found some possible improvements setting:
      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>
But none of these made a big difference in throughput.

I hope to bring ahead the discussion and see if we can find a clear issue in the logic or possible work-around.

Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem (BucketingSink)


Re: performances of S3 writing with many buckets in parallel

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Enrico,

Nice to hear from you and thanks for checking it out!

This can be helpful for people using the BucketingSink but I would
recommend you to switch to the StreamingFileSink which is the "new
version" of the BucketingSink. In fact the BucketingSink is going to
be removed in one of the following releases, as it is deprecated for
quite a while.

If you try the StreamingFileSink, let us know if the problem persists.

Cheers,
Kostas


On Fri, Feb 7, 2020 at 11:20 AM Enrico Agnoli <en...@workday.com> wrote:
>
> I finally found the time to dig a little more on this and found the real problem.
> The culprit of the slow-down is this piece of code:
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551
>
> This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs from an instrumented call:
> 2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
> 2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
> 2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....
>
> This together with the default setup of the bucketing sink with 60 secs inactivity rollover
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195
> means that with more than 10 parallel bucket on a slot by the time we finish creating the last bucket the first one became stale, so needs to be rotated generating a blocking situation.
>
> We solved this by deleting the FS check mentioned above (now the file opening takes ~1.2sec) and set the default inactive threshold to 5 mins. With this changes we can easily handle more than 200 buckets per slot (once the job takes speed it will ingest on all the slots so postponing the inactive timeout)
>
> -Enrico

Re: performances of S3 writing with many buckets in parallel

Posted by Enrico Agnoli <en...@workday.com>.
I finally found the time to dig a little more on this and found the real problem.
The culprit of the slow-down is this piece of code:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551

This alone takes around 4-5 secs, with a total of 6 secs to open the file. Logs from an instrumented call:
2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

This together with the default setup of the bucketing sink with 60 secs inactivity rollover 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 
means that with more than 10 parallel bucket on a slot by the time we finish creating the last bucket the first one became stale, so needs to be rotated generating a blocking situation.

We solved this by deleting the FS check mentioned above (now the file opening takes ~1.2sec) and set the default inactive threshold to 5 mins. With this changes we can easily handle more than 200 buckets per slot (once the job takes speed it will ingest on all the slots so postponing the inactive timeout)

-Enrico

Re: performances of S3 writing with many buckets in parallel

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Enrico,

Thanks for opening the discussion!

One thing to note that may help s that the hadoop S3 FS tries to
imitate a filesystem on top of S3:

- before writing a key it checks if the "parent directory" exists by
checking for a key with the prefix up to the last "/"
- it creates empty marker files to mark the existence of such a parent
directory - all these "existence" requests are S3 HEAD requests which
are expensive.
As a result the hadoop S3 FS has very high "create file" latency and
it hits request rate limits very quickly.

This may not play well with your directory structure. The fact that it
works well when messages for a specific dir come in bursts may be
explained
by some form of internal caching done by hadoop but I am not sure.

In any case, it may be also helpful to post your findings to the
hadoop community as well to see if they have any answers.

Finally, two recommendations:
 1) try using the presto-S3 instead of hadoop, as presto seems to be
more efficient on that regard, and please report if you notice any
changes
 2) try to move to the StreamingFileSink as the BucketingSink is
already deprecated (although presto-s3 is not supported by the
StreamingFileSink)

Cheers,
Kostas

On Tue, Oct 15, 2019 at 3:47 PM Enrico Agnoli <en...@workday.com> wrote:
>
> Starting here the discussion after an initial discussion with Ververica and AWS teams during FlinkForward.
> I'm investigating the performances of a Flink job that transports data from Kafka to an S3 Sink.
> We are using a BucketingSink to write parquet files. The bucketing logic divides the messages having a folder per type of data, tenant (customer), date-time, extraction Id, etc etc. This results in each file is stored in a folder structure composed by 9-10 layers (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...)
>
> If the data is distributed as bursts of messages for tenant-type we see good performances in writing, but when the data is more a white noise distribution on thousands of tenants, dozens of data types and multiple extraction IDs, we have an incredible loss of performances. (in the order of 300x times)
>
> Attaching a debugger, it seems the issue is connected to the number of handlers open at the same time on S3 to write data. More specifically
> https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png
>
> Researching in the hadoop libraries used to write to S3 I have found some possible improvements setting:
>       <name>fs.s3a.connection.maximum</name>
>       <name>fs.s3a.threads.max</name>
>       <name>fs.s3a.threads.core</name>
>       <name>fs.s3a.max.total.tasks</name>
> But none of these made a big difference in throughput.
>
> I hope to bring ahead the discussion and see if we can find a clear issue in the logic or possible work-around.
>
> Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem (BucketingSink)
>