You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kamil Dziublinski <ka...@gmail.com> on 2017/03/29 09:27:29 UTC

20 times higher throughput with Window function vs fold function, intended?

Hi guys,

I’m using flink on production in Mapp. We recently swapped from storm.
Before I have put this live I was doing performance tests and I found
something that “feels” a bit off.
I have a simple streaming job reading from kafka, doing window for 3
seconds and then storing into hbase.

Initially we had this second step written with a fold function, since I
thought performance and resource wise it’s a better idea.
But I couldn’t reach more than 120k writes per second to HBase and I
thought hbase sink is a bottlenck here. But then I tried doing the same
with window function and my performance jumped to 2 millions writes per
second. Just wow :) Comparing to storm where I had max 320k per second it
is amazing.

Both fold and window functions were doing the same thing, taking together
all the records for the same tenant and user (key by is used for that) and
putting it in one batched object with arraylists for the mutations on user
profile. After that passing this object to the sink. I can post the code if
its needed.

In case of fold I was just adding profile mutation to the list and in case
of window function iterating over all of it and returning this batched
entity in one go.

I’m wondering if this is expected to have 20 times slower performance just
by using fold function. I would like to know what is so costly about this,
as intuitively I would expect fold function being a better choice here
since I assume that window function is using more memory for buffering.

Also my colleagues when they were doing PoC on flink evaluation they were
seeing very similar results to what I am seeing now. But they were still
using fold function. This was on flink version 1.0.3 and now I am using
1.2.0. So perhaps there is some regression?

Please let me know what you think.

Cheers,
Kamil.

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Kamil Dziublinski <ka...@gmail.com>.
yep I meant 1200000 per second :)

On Fri, Mar 31, 2017 at 11:19 AM, Ted Yu <yu...@gmail.com> wrote:

> The 1,2million seems to be European notation.
>
> You meant 1.2 million, right ?
>
> On Mar 31, 2017, at 1:19 AM, Kamil Dziublinski <
> kamil.dziublinski@gmail.com> wrote:
>
> Hi,
>
> Thanks for the tip man. I tried playing with this.
> Was changing fetch.message.max.bytes (I still have 0.8 kafka) and
> also socket.receive.buffer.bytes. With some optimal settings I was able
> to get to 1,2 million reads per second. So 50% increase.
> But that unfortunately does not increase when I enable hbase sink again.
> So it means that backpressure kicks in and hbase writing is here limiting
> factor. I will try to tweak this a bit more if I find something I will
> share.
>
> Cheers,
> Kamil.
>
> On Thu, Mar 30, 2017 at 12:45 PM, Tzu-Li (Gordon) Tai <tzulitai@apache.org
> > wrote:
>
>> I'm wondering what I can tweak further to increase this. I was reading in
>> this blog: https://data-artisans.com/extending-the-yahoo-streamin
>> g-benchmark/
>> about 3 millions per sec with only 20 partitions. So i'm sure I should be
>> able to squeeze out more out of it.
>>
>>
>> Not really sure if it is relevant under the context of your case, but you
>> could perhaps try tweaking the maximum size of Kafka records fetched on
>> each poll on the partitions.
>> You can do this by setting a higher value for “max.partition.fetch.bytes”
>> in the provided config properties when instantiating the consumer; that
>> will directly configure the internal Kafka clients.
>> Generally, all Kafka settings are applicable through the provided config
>> properties, so you can perhaps take a look at the Kafka docs to see what
>> else there is to tune for the clients.
>>
>> On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (
>> kamil.dziublinski@gmail.com) wrote:
>>
>> I'm wondering what I can tweak further to increase this. I was reading in
>> this blog: https://data-artisans.com/extending-the-yahoo-streamin
>> g-benchmark/
>> about 3 millions per sec with only 20 partitions. So i'm sure I should be
>> able to squeeze out more out of it.
>>
>>
>

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Ted Yu <yu...@gmail.com>.
The 1,2million seems to be European notation. 

You meant 1.2 million, right ?

> On Mar 31, 2017, at 1:19 AM, Kamil Dziublinski <ka...@gmail.com> wrote:
> 
> Hi,
> 
> Thanks for the tip man. I tried playing with this.
> Was changing fetch.message.max.bytes (I still have 0.8 kafka) and also socket.receive.buffer.bytes. With some optimal settings I was able to get to 1,2 million reads per second. So 50% increase. 
> But that unfortunately does not increase when I enable hbase sink again. So it means that backpressure kicks in and hbase writing is here limiting factor. I will try to tweak this a bit more if I find something I will share.
> 
> Cheers,
> Kamil.
> 
> On Thu, Mar 30, 2017 at 12:45 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
>>> I'm wondering what I can tweak further to increase this. I was reading in this blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
>>> about 3 millions per sec with only 20 partitions. So i'm sure I should be able to squeeze out more out of it.
>> 
>> 
>> Not really sure if it is relevant under the context of your case, but you could perhaps try tweaking the maximum size of Kafka records fetched on each poll on the partitions.
>> You can do this by setting a higher value for “max.partition.fetch.bytes” in the provided config properties when instantiating the consumer; that will directly configure the internal Kafka clients.
>> Generally, all Kafka settings are applicable through the provided config properties, so you can perhaps take a look at the Kafka docs to see what else there is to tune for the clients.
>> 
>>> On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (kamil.dziublinski@gmail.com) wrote:
>>> 
>>> I'm wondering what I can tweak further to increase this. I was reading in this blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
>>> about 3 millions per sec with only 20 partitions. So i'm sure I should be able to squeeze out more out of it.
> 

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Kamil Dziublinski <ka...@gmail.com>.
Hi,

Thanks for the tip man. I tried playing with this.
Was changing fetch.message.max.bytes (I still have 0.8 kafka) and
also socket.receive.buffer.bytes. With some optimal settings I was able to
get to 1,2 million reads per second. So 50% increase.
But that unfortunately does not increase when I enable hbase sink again. So
it means that backpressure kicks in and hbase writing is here limiting
factor. I will try to tweak this a bit more if I find something I will
share.

Cheers,
Kamil.

On Thu, Mar 30, 2017 at 12:45 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> I'm wondering what I can tweak further to increase this. I was reading in
> this blog: https://data-artisans.com/extending-the-yahoo-
> streaming-benchmark/
> about 3 millions per sec with only 20 partitions. So i'm sure I should be
> able to squeeze out more out of it.
>
>
> Not really sure if it is relevant under the context of your case, but you
> could perhaps try tweaking the maximum size of Kafka records fetched on
> each poll on the partitions.
> You can do this by setting a higher value for “max.partition.fetch.bytes”
> in the provided config properties when instantiating the consumer; that
> will directly configure the internal Kafka clients.
> Generally, all Kafka settings are applicable through the provided config
> properties, so you can perhaps take a look at the Kafka docs to see what
> else there is to tune for the clients.
>
> On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (
> kamil.dziublinski@gmail.com) wrote:
>
> I'm wondering what I can tweak further to increase this. I was reading in
> this blog: https://data-artisans.com/extending-the-yahoo-
> streaming-benchmark/
> about 3 millions per sec with only 20 partitions. So i'm sure I should be
> able to squeeze out more out of it.
>
>

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
I'm wondering what I can tweak further to increase this. I was reading in this blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able to squeeze out more out of it.

Not really sure if it is relevant under the context of your case, but you could perhaps try tweaking the maximum size of Kafka records fetched on each poll on the partitions.
You can do this by setting a higher value for “max.partition.fetch.bytes” in the provided config properties when instantiating the consumer; that will directly configure the internal Kafka clients.
Generally, all Kafka settings are applicable through the provided config properties, so you can perhaps take a look at the Kafka docs to see what else there is to tune for the clients.

On March 30, 2017 at 6:11:27 PM, Kamil Dziublinski (kamil.dziublinski@gmail.com) wrote:

I'm wondering what I can tweak further to increase this. I was reading in this blog: https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be able to squeeze out more out of it.

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Kamil Dziublinski <ka...@gmail.com>.
Thanks Ted, will read about it.

While we are on throughput.
Do you guys have any suggestion on how to optimise kafka reading from
flink?
In my current setup:
Flink is on 15 machines on yarn
Kafka on 9 brokers with 40 partitions. Source parallelism is 40 for flink,
And just for testing I left only filters there without sink to see max
throughput.
I am getting max 800-900k per sec. And definitely not utilising 1gb/s
network. Im more or less utilising only 20-30% of network bandwith.

I'm wondering what I can tweak further to increase this. I was reading in
this blog:
https://data-artisans.com/extending-the-yahoo-streaming-benchmark/
about 3 millions per sec with only 20 partitions. So i'm sure I should be
able to squeeze out more out of it.

On Thu, Mar 30, 2017 at 11:51 AM, Ted Yu <yu...@gmail.com> wrote:

> Kamil:
> In the upcoming hbase 2.0 release, there are more write path optimizations
> which would boost write performance further.
>
> FYI
>
> On Mar 30, 2017, at 1:07 AM, Kamil Dziublinski <
> kamil.dziublinski@gmail.com> wrote:
>
> Hey guys,
>
> Sorry for confusion it turned out that I had a bug in my code, when I was
> not clearing this list in my batch object on each apply call. Forgot it has
> to be added since its different than fold.
> Which led to so high throughput. When I fixed this I was back to 160k per
> sec. I'm still investigating how I can speed it up.
>
> As a side note its quite interesting that hbase was able to do 2millions
> puts per second. But most of them were already stored with previous call so
> perhaps internally he is able to distinguish in memory if a put was stored
> or not. Not sure.
>
> Anyway my claim about window vs fold performance difference was wrong. So
> forget about it ;)
>
> On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Kamil,
>>
>> the performance implications might be the result of which state the
>> underlying functions are using internally. WindowFunctions use ListState or
>> ReducingState, fold() uses FoldingState. It also depends on the size of
>> your state and the state backend you are using. I recommend the following
>> documentation page. The FoldingState might be deprecated soon, once a
>> better alternative is available: https://ci.apache.org/projects
>> /flink/flink-docs-release-1.2/dev/stream/state.html#using-
>> managed-keyed-state
>>
>> I hope that helps.
>>
>> Regards,
>> Timo
>>
>> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>>
>> Hi guys,
>>
>> I’m using flink on production in Mapp. We recently swapped from storm.
>> Before I have put this live I was doing performance tests and I found
>> something that “feels” a bit off.
>> I have a simple streaming job reading from kafka, doing window for 3
>> seconds and then storing into hbase.
>>
>> Initially we had this second step written with a fold function, since I
>> thought performance and resource wise it’s a better idea.
>> But I couldn’t reach more than 120k writes per second to HBase and I
>> thought hbase sink is a bottlenck here. But then I tried doing the same
>> with window function and my performance jumped to 2 millions writes per
>> second. Just wow :) Comparing to storm where I had max 320k per second it
>> is amazing.
>>
>> Both fold and window functions were doing the same thing, taking together
>> all the records for the same tenant and user (key by is used for that) and
>> putting it in one batched object with arraylists for the mutations on user
>> profile. After that passing this object to the sink. I can post the code if
>> its needed.
>>
>> In case of fold I was just adding profile mutation to the list and in
>> case of window function iterating over all of it and returning this batched
>> entity in one go.
>>
>> I’m wondering if this is expected to have 20 times slower performance
>> just by using fold function. I would like to know what is so costly about
>> this, as intuitively I would expect fold function being a better choice
>> here since I assume that window function is using more memory for buffering.
>>
>> Also my colleagues when they were doing PoC on flink evaluation they were
>> seeing very similar results to what I am seeing now. But they were still
>> using fold function. This was on flink version 1.0.3 and now I am using
>> 1.2.0. So perhaps there is some regression?
>>
>> Please let me know what you think.
>>
>> Cheers,
>> Kamil.
>>
>>
>>
>

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Ted Yu <yu...@gmail.com>.
Kamil:
In the upcoming hbase 2.0 release, there are more write path optimizations which would boost write performance further. 

FYI 

> On Mar 30, 2017, at 1:07 AM, Kamil Dziublinski <ka...@gmail.com> wrote:
> 
> Hey guys,
> 
> Sorry for confusion it turned out that I had a bug in my code, when I was not clearing this list in my batch object on each apply call. Forgot it has to be added since its different than fold.
> Which led to so high throughput. When I fixed this I was back to 160k per sec. I'm still investigating how I can speed it up.
> 
> As a side note its quite interesting that hbase was able to do 2millions puts per second. But most of them were already stored with previous call so perhaps internally he is able to distinguish in memory if a put was stored or not. Not sure.
> 
> Anyway my claim about window vs fold performance difference was wrong. So forget about it ;)
> 
>> On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther <tw...@apache.org> wrote:
>> Hi Kamil,
>> 
>> the performance implications might be the result of which state the underlying functions are using internally. WindowFunctions use ListState or ReducingState, fold() uses FoldingState. It also depends on the size of your state and the state backend you are using. I recommend the following documentation page. The FoldingState might be deprecated soon, once a better alternative is available: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state
>> 
>> I hope that helps.
>> 
>> Regards,
>> Timo
>> 
>>> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>>> Hi guys,
>>> 
>>> I’m using flink on production in Mapp. We recently swapped from storm.
>>> Before I have put this live I was doing performance tests and I found something that “feels” a bit off.
>>> I have a simple streaming job reading from kafka, doing window for 3 seconds and then storing into hbase.
>>> 
>>> Initially we had this second step written with a fold function, since I thought performance and resource wise it’s a better idea. 
>>> But I couldn’t reach more than 120k writes per second to HBase and I thought hbase sink is a bottlenck here. But then I tried doing the same with window function and my performance jumped to 2 millions writes per second. Just wow :) Comparing to storm where I had max 320k per second it is amazing.
>>> 
>>> Both fold and window functions were doing the same thing, taking together all the records for the same tenant and user (key by is used for that) and putting it in one batched object with arraylists for the mutations on user profile. After that passing this object to the sink. I can post the code if its needed. 
>>> 
>>> In case of fold I was just adding profile mutation to the list and in case of window function iterating over all of it and returning this batched entity in one go.
>>> 
>>> I’m wondering if this is expected to have 20 times slower performance just by using fold function. I would like to know what is so costly about this, as intuitively I would expect fold function being a better choice here since I assume that window function is using more memory for buffering.
>>> 
>>> Also my colleagues when they were doing PoC on flink evaluation they were seeing very similar results to what I am seeing now. But they were still using fold function. This was on flink version 1.0.3 and now I am using 1.2.0. So perhaps there is some regression?
>>> 
>>> Please let me know what you think.
>>> 
>>> Cheers,
>>> Kamil.
> 

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Kamil Dziublinski <ka...@gmail.com>.
Hey guys,

Sorry for confusion it turned out that I had a bug in my code, when I was
not clearing this list in my batch object on each apply call. Forgot it has
to be added since its different than fold.
Which led to so high throughput. When I fixed this I was back to 160k per
sec. I'm still investigating how I can speed it up.

As a side note its quite interesting that hbase was able to do 2millions
puts per second. But most of them were already stored with previous call so
perhaps internally he is able to distinguish in memory if a put was stored
or not. Not sure.

Anyway my claim about window vs fold performance difference was wrong. So
forget about it ;)

On Wed, Mar 29, 2017 at 12:21 PM, Timo Walther <tw...@apache.org> wrote:

> Hi Kamil,
>
> the performance implications might be the result of which state the
> underlying functions are using internally. WindowFunctions use ListState or
> ReducingState, fold() uses FoldingState. It also depends on the size of
> your state and the state backend you are using. I recommend the following
> documentation page. The FoldingState might be deprecated soon, once a
> better alternative is available: https://ci.apache.org/
> projects/flink/flink-docs-release-1.2/dev/stream/state.
> html#using-managed-keyed-state
>
> I hope that helps.
>
> Regards,
> Timo
>
> Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
>
> Hi guys,
>
> I’m using flink on production in Mapp. We recently swapped from storm.
> Before I have put this live I was doing performance tests and I found
> something that “feels” a bit off.
> I have a simple streaming job reading from kafka, doing window for 3
> seconds and then storing into hbase.
>
> Initially we had this second step written with a fold function, since I
> thought performance and resource wise it’s a better idea.
> But I couldn’t reach more than 120k writes per second to HBase and I
> thought hbase sink is a bottlenck here. But then I tried doing the same
> with window function and my performance jumped to 2 millions writes per
> second. Just wow :) Comparing to storm where I had max 320k per second it
> is amazing.
>
> Both fold and window functions were doing the same thing, taking together
> all the records for the same tenant and user (key by is used for that) and
> putting it in one batched object with arraylists for the mutations on user
> profile. After that passing this object to the sink. I can post the code if
> its needed.
>
> In case of fold I was just adding profile mutation to the list and in case
> of window function iterating over all of it and returning this batched
> entity in one go.
>
> I’m wondering if this is expected to have 20 times slower performance just
> by using fold function. I would like to know what is so costly about this,
> as intuitively I would expect fold function being a better choice here
> since I assume that window function is using more memory for buffering.
>
> Also my colleagues when they were doing PoC on flink evaluation they were
> seeing very similar results to what I am seeing now. But they were still
> using fold function. This was on flink version 1.0.3 and now I am using
> 1.2.0. So perhaps there is some regression?
>
> Please let me know what you think.
>
> Cheers,
> Kamil.
>
>
>

Re: 20 times higher throughput with Window function vs fold function, intended?

Posted by Timo Walther <tw...@apache.org>.
Hi Kamil,

the performance implications might be the result of which state the 
underlying functions are using internally. WindowFunctions use ListState 
or ReducingState, fold() uses FoldingState. It also depends on the size 
of your state and the state backend you are using. I recommend the 
following documentation page. The FoldingState might be deprecated soon, 
once a better alternative is available: 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state

I hope that helps.

Regards,
Timo

Am 29/03/17 um 11:27 schrieb Kamil Dziublinski:
> Hi guys,
>
> I\u2019m using flink on production in Mapp. We recently swapped from storm.
> Before I have put this live I was doing performance tests and I found 
> something that \u201cfeels\u201d a bit off.
> I have a simple streaming job reading from kafka, doing window for 3 
> seconds and then storing into hbase.
>
> Initially we had this second step written with a fold function, since 
> I thought performance and resource wise it\u2019s a better idea.
> But I couldn\u2019t reach more than 120k writes per second to HBase and I 
> thought hbase sink is a bottlenck here. But then I tried doing the 
> same with window function and my performance jumped to 2 millions 
> writes per second. Just wow :) Comparing to storm where I had max 320k 
> per second it is amazing.
>
> Both fold and window functions were doing the same thing, taking 
> together all the records for the same tenant and user (key by is used 
> for that) and putting it in one batched object with arraylists for the 
> mutations on user profile. After that passing this object to the sink. 
> I can post the code if its needed.
>
> In case of fold I was just adding profile mutation to the list and in 
> case of window function iterating over all of it and returning this 
> batched entity in one go.
>
> I\u2019m wondering if this is expected to have 20 times slower performance 
> just by using fold function. I would like to know what is so costly 
> about this, as intuitively I would expect fold function being a better 
> choice here since I assume that window function is using more memory 
> for buffering.
>
> Also my colleagues when they were doing PoC on flink evaluation they 
> were seeing very similar results to what I am seeing now. But they 
> were still using fold function. This was on flink version 1.0.3 and 
> now I am using 1.2.0. So perhaps there is some regression?
>
> Please let me know what you think.
>
> Cheers,
> Kamil.