You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by neo21 zerro <ne...@yahoo.com> on 2016/08/03 08:14:38 UTC

Flink Kafka more consumers than partitions

Hello everybody, 

I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).

when: 
     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)


It's clear that hbase is not the limiting factor in my topology. 
Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology. 

The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure? 


Thanks guys!

Re: Flink Kafka more consumers than partitions

Posted by neo21 zerro <ne...@yahoo.com>.
Hi Stephan, 
Yes,  I use key by between the source and the window operator.Interesting theory, will try it out and get back to you :) 
Thanks! 

    On Wednesday, August 3, 2016 12:14 PM, Stephan Ewen <se...@apache.org> wrote:
 

 Do you use a keyBy() between the source and the window operator?
One think I can think of is the following:
  - With the higher source parallelism, you have more logical connections (each source rebalances across all window operators).  - with source parallelism 20, you have 20 * 160 = 3200 logical connections (fewer physical TCP connections, because Flink multiplexes)  - with source parallelism 40, you have 40 * 160 = 6400 logical connections.
With more logical connections, you need more network buffers. While you seem to have enough buffers to make the 6400 connections work, it may be just a bit to little to balance out some short lived skew/latency effects in the network.With the 3200 connections, each connection can claim twice the number of buffers, giving it more elasticity to balance out network latency effects.
I would try to double the number of network buffers for the case where the source has a parallelism of 40, and see if that helps.
Greetings,
Stephan

On Wed, Aug 3, 2016 at 12:07 PM, Stephan Ewen <se...@apache.org> wrote:

Hi!
Are you running on ProcessingTime or on EventTime?
Thanks,Stephan

On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:

Hi guys,

Thanks for getting back to me.

So to clarify:
    Topology wise flink kafka source (does avro deserialization and small map) -> window operator which does batching for 3 seconds -> hbase sink

Experiments:

1. flink source: parallelism 40 (20 idle tasks) -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 10.000 requests/sec on hbase
2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 100.000 requests/sec on hbase (10x more)

@Stephan as described below the parallelism of the sink was kept the same. I agree with you that there is nothing to backpressue on the source ;) However, my understanding right now is that only backpressure can be the explanation for this situation. Since we just change the source parallelism, other things like hbase parallelism  are kept the same.

@Sameer all of those things are valid points. We make sure that we reduce the row locking by partitioning the data on the hbase sinks. We are just after why this limitation is happening. And since the same setup is used but just the source parallelism is changed I don't expect this to be a hbase issue.

Thanks guys!



On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com> wrote:
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into.
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders.
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts.

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase.

Sent from my iPhone


> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>
> Hello everybody,
>
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
>
> when:
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>
>
> It's clear that hbase is not the limiting factor in my topology.
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology.
>
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure?
>
>
> Thanks guys!






  

Re: Flink Kafka more consumers than partitions

Posted by Stephan Ewen <se...@apache.org>.
Do you use a keyBy() between the source and the window operator?

One think I can think of is the following:

  - With the higher source parallelism, you have more logical connections
(each source rebalances across all window operators).
  - with source parallelism 20, you have 20 * 160 = 3200 logical
connections (fewer physical TCP connections, because Flink multiplexes)
  - with source parallelism 40, you have 40 * 160 = 6400 logical
connections.

With more logical connections, you need more network buffers. While you
seem to have enough buffers to make the 6400 connections work, it may be
just a bit to little to balance out some short lived skew/latency effects
in the network.
With the 3200 connections, each connection can claim twice the number of
buffers, giving it more elasticity to balance out network latency effects.

I would try to double the number of network buffers for the case where the
source has a parallelism of 40, and see if that helps.

Greetings,
Stephan


On Wed, Aug 3, 2016 at 12:07 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Are you running on ProcessingTime or on EventTime?
>
> Thanks,
> Stephan
>
>
> On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com>
> wrote:
>
>> Hi guys,
>>
>> Thanks for getting back to me.
>>
>> So to clarify:
>>     Topology wise flink kafka source (does avro deserialization and small
>> map) -> window operator which does batching for 3 seconds -> hbase sink
>>
>> Experiments:
>>
>> 1. flink source: parallelism 40 (20 idle tasks) -> window operator:
>> parallelism 160 -> hbase sink: parallelism 160
>>     - roughly 10.000 requests/sec on hbase
>> 2. flink source: parallelism 20 -> window operator: parallelism 160 ->
>> hbase sink: parallelism 160
>>     - roughly 100.000 requests/sec on hbase (10x more)
>>
>> @Stephan as described below the parallelism of the sink was kept the
>> same. I agree with you that there is nothing to backpressue on the source
>> ;) However, my understanding right now is that only backpressure can be the
>> explanation for this situation. Since we just change the source
>> parallelism, other things like hbase parallelism  are kept the same.
>>
>> @Sameer all of those things are valid points. We make sure that we reduce
>> the row locking by partitioning the data on the hbase sinks. We are just
>> after why this limitation is happening. And since the same setup is used
>> but just the source parallelism is changed I don't expect this to be a
>> hbase issue.
>>
>> Thanks guys!
>>
>>
>>
>> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com>
>> wrote:
>> What is the parallelism of the sink or the operator which writes to the
>> sinks in the first case. HBase puts are constrained by the following:
>> 1. How your regions are distributed. Are you pre-splitting your regions
>> for the table. Do you know the number of regions your Hbase tables are
>> split into.
>> 2. Are all the sinks writing to all the regions. Meaning are you getting
>> records in the sink operator which could potentially go to any region. This
>> can become a big bottleneck if you have 40 sinks talking to all regions. I
>> pre-split my regions based on key salting and use custom partitioning to
>> ensure each sink operator writes to only a few regions and my performance
>> shot up by several orders.
>> 3. You are probably doing this but adding puts in batches helps in
>> general but having each batch contain puts for too many regions hurts.
>>
>> If the source parallelism is the same as the parallelism of other
>> operators the 40 sinks communicating to all regions might be a problem.
>> When you go down to 20 sinks you actually might be getting better
>> performance due to lesser resource contention on HBase.
>>
>> Sent from my iPhone
>>
>>
>> > On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>> >
>> > Hello everybody,
>> >
>> > I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3
>> on YARN.
>> > In kafka I have a topic which have 20 partitions and my flink topology
>> reads from kafka (source) and writes to hbase (sink).
>> >
>> > when:
>> >     1. flink source has parallelism set to 40 (20 of the tasks are
>> idle) I see 10.000 requests/sec on hbase
>> >     2. flink source has parallelism set to 20 (exact number of
>> partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>> >
>> >
>> > It's clear that hbase is not the limiting factor in my topology.
>> > Assumption: Flink backpressure mechanism kicks in in the 1. case more
>> aggressively and it's limiting the ingestion of tuples in the topology.
>> >
>> > The question: In the first case, why are those 20 sources which are
>> sitting idle contributing so much to the backpressure?
>> >
>> >
>> > Thanks guys!
>>
>
>

Re: Flink Kafka more consumers than partitions

Posted by Maximilian Michels <mx...@apache.org>.
Thanks for letting us know!

On Sat, Sep 3, 2016 at 12:42 PM, neo21 zerro <ne...@yahoo.com> wrote:
> Hi all,
>
> It turns out that there were other factors influencing my performance tests.
> (actually hbase)
> Hence, more consumers than partitions in Flink was not the problem.
> Thanks for the help!
>
>
> On Wednesday, August 3, 2016 5:42 PM, neo21 zerro <ne...@yahoo.com>
> wrote:
>
>
> Hello,
>
> I've tried to increase the network buffers but I didn't get any performance
> improvement.
> However, I have to re-run some tests just to be sure that the testing was
> not influenced by other factors.
> Will get back with more info.
> Thanks for the help for now.
>
>
> On Wednesday, August 3, 2016 12:58 PM, neo21 zerro <ne...@yahoo.com>
> wrote:
>
>
> It's the default, ProcessingTime.
>
>
> On Wednesday, August 3, 2016 12:07 PM, Stephan Ewen <se...@apache.org>
> wrote:
>
>
> Hi!
>
> Are you running on ProcessingTime or on EventTime?
>
> Thanks,
> Stephan
>
>
> On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:
>
> Hi guys,
>
> Thanks for getting back to me.
>
> So to clarify:
>     Topology wise flink kafka source (does avro deserialization and small
> map) -> window operator which does batching for 3 seconds -> hbase sink
>
> Experiments:
>
> 1. flink source: parallelism 40 (20 idle tasks) -> window operator:
> parallelism 160 -> hbase sink: parallelism 160
>     - roughly 10.000 requests/sec on hbase
> 2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase
> sink: parallelism 160
>     - roughly 100.000 requests/sec on hbase (10x more)
>
> @Stephan as described below the parallelism of the sink was kept the same. I
> agree with you that there is nothing to backpressue on the source ;)
> However, my understanding right now is that only backpressure can be the
> explanation for this situation. Since we just change the source parallelism,
> other things like hbase parallelism  are kept the same.
>
> @Sameer all of those things are valid points. We make sure that we reduce
> the row locking by partitioning the data on the hbase sinks. We are just
> after why this limitation is happening. And since the same setup is used but
> just the source parallelism is changed I don't expect this to be a hbase
> issue.
>
> Thanks guys!
>
>
>
> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com>
> wrote:
> What is the parallelism of the sink or the operator which writes to the
> sinks in the first case. HBase puts are constrained by the following:
> 1. How your regions are distributed. Are you pre-splitting your regions for
> the table. Do you know the number of regions your Hbase tables are split
> into.
> 2. Are all the sinks writing to all the regions. Meaning are you getting
> records in the sink operator which could potentially go to any region. This
> can become a big bottleneck if you have 40 sinks talking to all regions. I
> pre-split my regions based on key salting and use custom partitioning to
> ensure each sink operator writes to only a few regions and my performance
> shot up by several orders.
> 3. You are probably doing this but adding puts in batches helps in general
> but having each batch contain puts for too many regions hurts.
>
> If the source parallelism is the same as the parallelism of other operators
> the 40 sinks communicating to all regions might be a problem. When you go
> down to 20 sinks you actually might be getting better performance due to
> lesser resource contention on HBase.
>
> Sent from my iPhone
>
>
>> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>>
>> Hello everybody,
>>
>> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on
>> YARN.
>> In kafka I have a topic which have 20 partitions and my flink topology
>> reads from kafka (source) and writes to hbase (sink).
>>
>> when:
>>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I
>> see 10.000 requests/sec on hbase
>>     2. flink source has parallelism set to 20 (exact number of partitions)
>> I see 100.0000 requests/sec on hbase (so a 10x improvement)
>>
>>
>> It's clear that hbase is not the limiting factor in my topology.
>> Assumption: Flink backpressure mechanism kicks in in the 1. case more
>> aggressively and it's limiting the ingestion of tuples in the topology.
>>
>> The question: In the first case, why are those 20 sources which are
>> sitting idle contributing so much to the backpressure?
>>
>>
>> Thanks guys!
>
>
>
>
>
>
>
>

Re: Flink Kafka more consumers than partitions

Posted by neo21 zerro <ne...@yahoo.com>.
Hi all, 
It turns out that there were other factors influencing my performance tests. (actually hbase)Hence, more consumers than partitions in Flink was not the problem. Thanks for the help! 

    On Wednesday, August 3, 2016 5:42 PM, neo21 zerro <ne...@yahoo.com> wrote:
 

 Hello, 
I've tried to increase the network buffers but I didn't get any performance improvement. However, I have to re-run some tests just to be sure that the testing was not influenced by other factors. Will get back with more info. Thanks for the help for now. 

    On Wednesday, August 3, 2016 12:58 PM, neo21 zerro <ne...@yahoo.com> wrote:
 

 It's the default, ProcessingTime.  

    On Wednesday, August 3, 2016 12:07 PM, Stephan Ewen <se...@apache.org> wrote:
 

 Hi!
Are you running on ProcessingTime or on EventTime?
Thanks,Stephan

On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:

Hi guys,

Thanks for getting back to me.

So to clarify:
    Topology wise flink kafka source (does avro deserialization and small map) -> window operator which does batching for 3 seconds -> hbase sink

Experiments:

1. flink source: parallelism 40 (20 idle tasks) -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 10.000 requests/sec on hbase
2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 100.000 requests/sec on hbase (10x more)

@Stephan as described below the parallelism of the sink was kept the same. I agree with you that there is nothing to backpressue on the source ;) However, my understanding right now is that only backpressure can be the explanation for this situation. Since we just change the source parallelism, other things like hbase parallelism  are kept the same.

@Sameer all of those things are valid points. We make sure that we reduce the row locking by partitioning the data on the hbase sinks. We are just after why this limitation is happening. And since the same setup is used but just the source parallelism is changed I don't expect this to be a hbase issue.

Thanks guys!



On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com> wrote:
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into.
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders.
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts.

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase.

Sent from my iPhone


> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>
> Hello everybody,
>
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
>
> when:
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>
>
> It's clear that hbase is not the limiting factor in my topology.
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology.
>
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure?
>
>
> Thanks guys!




   

   

   

Re: Flink Kafka more consumers than partitions

Posted by neo21 zerro <ne...@yahoo.com>.
Hello, 
I've tried to increase the network buffers but I didn't get any performance improvement. However, I have to re-run some tests just to be sure that the testing was not influenced by other factors. Will get back with more info. Thanks for the help for now. 

    On Wednesday, August 3, 2016 12:58 PM, neo21 zerro <ne...@yahoo.com> wrote:
 

 It's the default, ProcessingTime.  

    On Wednesday, August 3, 2016 12:07 PM, Stephan Ewen <se...@apache.org> wrote:
 

 Hi!
Are you running on ProcessingTime or on EventTime?
Thanks,Stephan

On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:

Hi guys,

Thanks for getting back to me.

So to clarify:
    Topology wise flink kafka source (does avro deserialization and small map) -> window operator which does batching for 3 seconds -> hbase sink

Experiments:

1. flink source: parallelism 40 (20 idle tasks) -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 10.000 requests/sec on hbase
2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 100.000 requests/sec on hbase (10x more)

@Stephan as described below the parallelism of the sink was kept the same. I agree with you that there is nothing to backpressue on the source ;) However, my understanding right now is that only backpressure can be the explanation for this situation. Since we just change the source parallelism, other things like hbase parallelism  are kept the same.

@Sameer all of those things are valid points. We make sure that we reduce the row locking by partitioning the data on the hbase sinks. We are just after why this limitation is happening. And since the same setup is used but just the source parallelism is changed I don't expect this to be a hbase issue.

Thanks guys!



On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com> wrote:
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into.
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders.
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts.

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase.

Sent from my iPhone


> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>
> Hello everybody,
>
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
>
> when:
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>
>
> It's clear that hbase is not the limiting factor in my topology.
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology.
>
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure?
>
>
> Thanks guys!




   

  

Re: Flink Kafka more consumers than partitions

Posted by neo21 zerro <ne...@yahoo.com>.
It's the default, ProcessingTime.  

    On Wednesday, August 3, 2016 12:07 PM, Stephan Ewen <se...@apache.org> wrote:
 

 Hi!
Are you running on ProcessingTime or on EventTime?
Thanks,Stephan

On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:

Hi guys,

Thanks for getting back to me.

So to clarify:
    Topology wise flink kafka source (does avro deserialization and small map) -> window operator which does batching for 3 seconds -> hbase sink

Experiments:

1. flink source: parallelism 40 (20 idle tasks) -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 10.000 requests/sec on hbase
2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 100.000 requests/sec on hbase (10x more)

@Stephan as described below the parallelism of the sink was kept the same. I agree with you that there is nothing to backpressue on the source ;) However, my understanding right now is that only backpressure can be the explanation for this situation. Since we just change the source parallelism, other things like hbase parallelism  are kept the same.

@Sameer all of those things are valid points. We make sure that we reduce the row locking by partitioning the data on the hbase sinks. We are just after why this limitation is happening. And since the same setup is used but just the source parallelism is changed I don't expect this to be a hbase issue.

Thanks guys!



On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com> wrote:
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into.
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders.
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts.

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase.

Sent from my iPhone


> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
>
> Hello everybody,
>
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
>
> when:
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>
>
> It's clear that hbase is not the limiting factor in my topology.
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology.
>
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure?
>
>
> Thanks guys!




  

Re: Flink Kafka more consumers than partitions

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Are you running on ProcessingTime or on EventTime?

Thanks,
Stephan


On Wed, Aug 3, 2016 at 11:57 AM, neo21 zerro <ne...@yahoo.com> wrote:

> Hi guys,
>
> Thanks for getting back to me.
>
> So to clarify:
>     Topology wise flink kafka source (does avro deserialization and small
> map) -> window operator which does batching for 3 seconds -> hbase sink
>
> Experiments:
>
> 1. flink source: parallelism 40 (20 idle tasks) -> window operator:
> parallelism 160 -> hbase sink: parallelism 160
>     - roughly 10.000 requests/sec on hbase
> 2. flink source: parallelism 20 -> window operator: parallelism 160 ->
> hbase sink: parallelism 160
>     - roughly 100.000 requests/sec on hbase (10x more)
>
> @Stephan as described below the parallelism of the sink was kept the same.
> I agree with you that there is nothing to backpressue on the source ;)
> However, my understanding right now is that only backpressure can be the
> explanation for this situation. Since we just change the source
> parallelism, other things like hbase parallelism  are kept the same.
>
> @Sameer all of those things are valid points. We make sure that we reduce
> the row locking by partitioning the data on the hbase sinks. We are just
> after why this limitation is happening. And since the same setup is used
> but just the source parallelism is changed I don't expect this to be a
> hbase issue.
>
> Thanks guys!
>
>
>
> On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com>
> wrote:
> What is the parallelism of the sink or the operator which writes to the
> sinks in the first case. HBase puts are constrained by the following:
> 1. How your regions are distributed. Are you pre-splitting your regions
> for the table. Do you know the number of regions your Hbase tables are
> split into.
> 2. Are all the sinks writing to all the regions. Meaning are you getting
> records in the sink operator which could potentially go to any region. This
> can become a big bottleneck if you have 40 sinks talking to all regions. I
> pre-split my regions based on key salting and use custom partitioning to
> ensure each sink operator writes to only a few regions and my performance
> shot up by several orders.
> 3. You are probably doing this but adding puts in batches helps in general
> but having each batch contain puts for too many regions hurts.
>
> If the source parallelism is the same as the parallelism of other
> operators the 40 sinks communicating to all regions might be a problem.
> When you go down to 20 sinks you actually might be getting better
> performance due to lesser resource contention on HBase.
>
> Sent from my iPhone
>
>
> > On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
> >
> > Hello everybody,
> >
> > I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on
> YARN.
> > In kafka I have a topic which have 20 partitions and my flink topology
> reads from kafka (source) and writes to hbase (sink).
> >
> > when:
> >     1. flink source has parallelism set to 40 (20 of the tasks are idle)
> I see 10.000 requests/sec on hbase
> >     2. flink source has parallelism set to 20 (exact number of
> partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
> >
> >
> > It's clear that hbase is not the limiting factor in my topology.
> > Assumption: Flink backpressure mechanism kicks in in the 1. case more
> aggressively and it's limiting the ingestion of tuples in the topology.
> >
> > The question: In the first case, why are those 20 sources which are
> sitting idle contributing so much to the backpressure?
> >
> >
> > Thanks guys!
>

Re: Flink Kafka more consumers than partitions

Posted by neo21 zerro <ne...@yahoo.com>.
Hi guys, 

Thanks for getting back to me.

So to clarify: 
    Topology wise flink kafka source (does avro deserialization and small map) -> window operator which does batching for 3 seconds -> hbase sink 

Experiments: 

1. flink source: parallelism 40 (20 idle tasks) -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 10.000 requests/sec on hbase
2. flink source: parallelism 20 -> window operator: parallelism 160 -> hbase sink: parallelism 160
    - roughly 100.000 requests/sec on hbase (10x more)

@Stephan as described below the parallelism of the sink was kept the same. I agree with you that there is nothing to backpressue on the source ;) However, my understanding right now is that only backpressure can be the explanation for this situation. Since we just change the source parallelism, other things like hbase parallelism  are kept the same. 
 
@Sameer all of those things are valid points. We make sure that we reduce the row locking by partitioning the data on the hbase sinks. We are just after why this limitation is happening. And since the same setup is used but just the source parallelism is changed I don't expect this to be a hbase issue. 

Thanks guys!



On Wednesday, August 3, 2016 11:38 AM, Sameer Wadkar <sa...@axiomine.com> wrote:
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into. 
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders. 
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts. 

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase. 

Sent from my iPhone


> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
> 
> Hello everybody, 
> 
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
> 
> when: 
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
> 
> 
> It's clear that hbase is not the limiting factor in my topology. 
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology. 
> 
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure? 
> 
> 
> Thanks guys!

Re: Flink Kafka more consumers than partitions

Posted by Sameer Wadkar <sa...@axiomine.com>.
What is the parallelism of the sink or the operator which writes to the sinks in the first case. HBase puts are constrained by the following:
1. How your regions are distributed. Are you pre-splitting your regions for the table. Do you know the number of regions your Hbase tables are split into. 
2. Are all the sinks writing to all the regions. Meaning are you getting records in the sink operator which could potentially go to any region. This can become a big bottleneck if you have 40 sinks talking to all regions. I pre-split my regions based on key salting and use custom partitioning to ensure each sink operator writes to only a few regions and my performance shot up by several orders. 
3. You are probably doing this but adding puts in batches helps in general but having each batch contain puts for too many regions hurts. 

If the source parallelism is the same as the parallelism of other operators the 40 sinks communicating to all regions might be a problem. When you go down to 20 sinks you actually might be getting better performance due to lesser resource contention on HBase. 

Sent from my iPhone

> On Aug 3, 2016, at 4:14 AM, neo21 zerro <ne...@yahoo.com> wrote:
> 
> Hello everybody, 
> 
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on YARN.
> In kafka I have a topic which have 20 partitions and my flink topology reads from kafka (source) and writes to hbase (sink).
> 
> when: 
>     1. flink source has parallelism set to 40 (20 of the tasks are idle) I see 10.000 requests/sec on hbase
>     2. flink source has parallelism set to 20 (exact number of partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
> 
> 
> It's clear that hbase is not the limiting factor in my topology. 
> Assumption: Flink backpressure mechanism kicks in in the 1. case more aggressively and it's limiting the ingestion of tuples in the topology. 
> 
> The question: In the first case, why are those 20 sources which are sitting idle contributing so much to the backpressure? 
> 
> 
> Thanks guys!

Re: Flink Kafka more consumers than partitions

Posted by Stephan Ewen <se...@apache.org>.
Hi!

That is interesting, indeed. The idle sources should not create
backpressure. In fact, sources cannot create back pressure, because back
pressure pressures backwards and there is nothing backwards from the
sources ;-)

Do you adjust also the parallelism of the operator that interacts with
HBase, or just the source parallelism?

Cheers!

On Wed, Aug 3, 2016 at 10:14 AM, neo21 zerro <ne...@yahoo.com> wrote:

> Hello everybody,
>
> I'm using Flink Kafka consumer 0.8.x with kafka 0.8.2 and flink 1.0.3 on
> YARN.
> In kafka I have a topic which have 20 partitions and my flink topology
> reads from kafka (source) and writes to hbase (sink).
>
> when:
>      1. flink source has parallelism set to 40 (20 of the tasks are idle)
> I see 10.000 requests/sec on hbase
>      2. flink source has parallelism set to 20 (exact number of
> partitions) I see 100.0000 requests/sec on hbase (so a 10x improvement)
>
>
> It's clear that hbase is not the limiting factor in my topology.
> Assumption: Flink backpressure mechanism kicks in in the 1. case more
> aggressively and it's limiting the ingestion of tuples in the topology.
>
> The question: In the first case, why are those 20 sources which are
> sitting idle contributing so much to the backpressure?
>
>
> Thanks guys!
>