You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Raphael Hsieh <ra...@gmail.com> on 2014/05/31 01:24:35 UTC

Optimizing Kafka Stream

I am in the process of optimizing my stream. Currently I expect 5 000 000
tuples to come out of my spout per minute. I am trying to beef up my
topology in order to process this in real time without falling behind.

For some reason my batch size is capping out at 83 thousand tuples. I can't
seem to make it any bigger. the processing time doesn't seem to get any
smaller than 2-3 seconds either.
I'm not sure how to configure the topology to get any faster / more
efficient.

Currently all the topology does is a groupby on time and an aggregation
(Count) to aggregate everything.

Here are some data points i've figured out.

Batch Size:5mb
num-workers: 1
parallelismHint: 2
(I'll write this a 5mb, 1, 2)

5mb, 1, 2 = 83K tuples / 6s
10mb, 1, 2 = 83k / 7s
5mb, 1, 4 = 83k / 6s
5mb, 2, 4 = 83k / 3s
5mb, 3, 6 = 83k / 3s
10mb, 3, 6 = 83k / 3s

Can anybody help me figure out how to get it to process things faster ?

My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
MessageTimeoutSec = 100

I've been following this blog: https://gist.github.com/mrflip/5958028
to an extent, not everything word for word though.

I need to be able to process around 66,000 tuples per second and I'm
starting to run out of ideas.

Thanks

-- 
Raphael Hsieh

Re: Optimizing Kafka Stream

Posted by Raphael Hsieh <ra...@gmail.com>.
Oh ok. Thanks Chi!
Do you have any ideas about why my batch size never seems to get any bigger
than 83K tuples ?
Currently I'm just using a barebones topology that looks like this:

Stream spout = topology.newStream("...", ...)
   .parallelismHint()
   .groupBy("new Fields("time"))
   .aggregate(new Count(), new Fields("Count"))
   .parallelismHint()
   .each(new Fields("time", "count"), new PrintFilter());

All the stream is doing is aggregating on like timestamps and printing out
the count.

in my config I've set batch size to 10mb like so:
Config config = new Config();
config.(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 1024*1024*10);

when I have the batch size to 5mb or even 1mb there is no difference,
everything always adds up to roughly 83K tuples.

in order to count up how many tuples are in the batch, I take a look at the
system timestamp of when things are printed out (in the print filter) and
all the print statements that have the same timestamp, I add the count
values up together.

On Mon, Jun 2, 2014 at 10:09 AM, Chi Hoang <ch...@groupon.com> wrote:

> Raphael,
> The number of partitions is defined in your Kafka configuration -
> http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions)
> - or when you create the topic.  The behavior is different for each version
> of Kafka, so you should read more documentation.  Your topology needs to
> match the Kafka configuration for the topic.
>
> Chi
>
>
> On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh <ra...@gmail.com>
> wrote:
>
>> Thanks for the tips Chi,
>> I'm a little confused about the partitioning. I had thought that the
>> number of partitions was determined by the amount of parallelism in the
>> topology. For example if I said .parallelismHint(4), then I would have 4
>> different partitions. Is this not the case ?
>> Is there a set number of partitions my topology has that I need to
>> increase in order to have higher parallelism ?
>>
>> Thanks
>>
>>
>> On Sat, May 31, 2014 at 11:50 AM, Chi Hoang <ch...@groupon.com> wrote:
>>
>>> Raphael,
>>> You can try tuning your parallelism (and num workers).
>>>
>>> For Kafka 0.7, your spout parallelism could max out at: # brokers x #
>>> partitions (for the topic).  If you have 4 Kafka brokers, and your topic
>>> has 5 partitions, then you could set the spout parallelism to 20 to
>>> maximize the throughput.
>>>
>>> For Kafka 0.8+, your spout parallelism could max out at # partitions for
>>> the topic, so if your topic has 5 partitions, then you would set the spout
>>> parallelism to 5.  To increase parallelism, you would need to increase the
>>> number of partitions for your topic (by using the add partitions utility).
>>>
>>> As for the number of workers, setting it to 1 means that your spout will
>>> only run on a single Storm node, and would likely share resources with
>>> other Storm processes (spouts and bolts).  I recommend to increase the
>>> number of workers so Storm has a chance to spread out the work, and keep a
>>> good balance.
>>>
>>> Hope this helps.
>>>
>>> Chi
>>>
>>>
>>> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh <ra...@gmail.com>
>>> wrote:
>>>
>>>> I am in the process of optimizing my stream. Currently I expect 5 000
>>>> 000 tuples to come out of my spout per minute. I am trying to beef up my
>>>> topology in order to process this in real time without falling behind.
>>>>
>>>> For some reason my batch size is capping out at 83 thousand tuples. I
>>>> can't seem to make it any bigger. the processing time doesn't seem to get
>>>> any smaller than 2-3 seconds either.
>>>> I'm not sure how to configure the topology to get any faster / more
>>>> efficient.
>>>>
>>>> Currently all the topology does is a groupby on time and an aggregation
>>>> (Count) to aggregate everything.
>>>>
>>>> Here are some data points i've figured out.
>>>>
>>>> Batch Size:5mb
>>>> num-workers: 1
>>>> parallelismHint: 2
>>>> (I'll write this a 5mb, 1, 2)
>>>>
>>>> 5mb, 1, 2 = 83K tuples / 6s
>>>> 10mb, 1, 2 = 83k / 7s
>>>> 5mb, 1, 4 = 83k / 6s
>>>> 5mb, 2, 4 = 83k / 3s
>>>> 5mb, 3, 6 = 83k / 3s
>>>> 10mb, 3, 6 = 83k / 3s
>>>>
>>>> Can anybody help me figure out how to get it to process things faster ?
>>>>
>>>> My maxSpoutPending is at 1, but when I increased it to 2 it was the
>>>> same. MessageTimeoutSec = 100
>>>>
>>>> I've been following this blog: https://gist.github.com/mrflip/5958028
>>>> to an extent, not everything word for word though.
>>>>
>>>> I need to be able to process around 66,000 tuples per second and I'm
>>>> starting to run out of ideas.
>>>>
>>>> Thanks
>>>>
>>>> --
>>>> Raphael Hsieh
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>>
>
>
>
> --
> Data Systems Engineering
> data-systems@groupon.com
>



-- 
Raphael Hsieh

Re: Optimizing Kafka Stream

Posted by Chi Hoang <ch...@groupon.com>.
Raphael,
The number of partitions is defined in your Kafka configuration -
http://kafka.apache.org/documentation.html#brokerconfigs (num.partitions) -
or when you create the topic.  The behavior is different for each version
of Kafka, so you should read more documentation.  Your topology needs to
match the Kafka configuration for the topic.

Chi


On Mon, Jun 2, 2014 at 8:46 AM, Raphael Hsieh <ra...@gmail.com> wrote:

> Thanks for the tips Chi,
> I'm a little confused about the partitioning. I had thought that the
> number of partitions was determined by the amount of parallelism in the
> topology. For example if I said .parallelismHint(4), then I would have 4
> different partitions. Is this not the case ?
> Is there a set number of partitions my topology has that I need to
> increase in order to have higher parallelism ?
>
> Thanks
>
>
> On Sat, May 31, 2014 at 11:50 AM, Chi Hoang <ch...@groupon.com> wrote:
>
>> Raphael,
>> You can try tuning your parallelism (and num workers).
>>
>> For Kafka 0.7, your spout parallelism could max out at: # brokers x #
>> partitions (for the topic).  If you have 4 Kafka brokers, and your topic
>> has 5 partitions, then you could set the spout parallelism to 20 to
>> maximize the throughput.
>>
>> For Kafka 0.8+, your spout parallelism could max out at # partitions for
>> the topic, so if your topic has 5 partitions, then you would set the spout
>> parallelism to 5.  To increase parallelism, you would need to increase the
>> number of partitions for your topic (by using the add partitions utility).
>>
>> As for the number of workers, setting it to 1 means that your spout will
>> only run on a single Storm node, and would likely share resources with
>> other Storm processes (spouts and bolts).  I recommend to increase the
>> number of workers so Storm has a chance to spread out the work, and keep a
>> good balance.
>>
>> Hope this helps.
>>
>> Chi
>>
>>
>> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh <ra...@gmail.com>
>> wrote:
>>
>>> I am in the process of optimizing my stream. Currently I expect 5 000
>>> 000 tuples to come out of my spout per minute. I am trying to beef up my
>>> topology in order to process this in real time without falling behind.
>>>
>>> For some reason my batch size is capping out at 83 thousand tuples. I
>>> can't seem to make it any bigger. the processing time doesn't seem to get
>>> any smaller than 2-3 seconds either.
>>> I'm not sure how to configure the topology to get any faster / more
>>> efficient.
>>>
>>> Currently all the topology does is a groupby on time and an aggregation
>>> (Count) to aggregate everything.
>>>
>>> Here are some data points i've figured out.
>>>
>>> Batch Size:5mb
>>> num-workers: 1
>>> parallelismHint: 2
>>> (I'll write this a 5mb, 1, 2)
>>>
>>> 5mb, 1, 2 = 83K tuples / 6s
>>> 10mb, 1, 2 = 83k / 7s
>>> 5mb, 1, 4 = 83k / 6s
>>> 5mb, 2, 4 = 83k / 3s
>>> 5mb, 3, 6 = 83k / 3s
>>> 10mb, 3, 6 = 83k / 3s
>>>
>>> Can anybody help me figure out how to get it to process things faster ?
>>>
>>> My maxSpoutPending is at 1, but when I increased it to 2 it was the
>>> same. MessageTimeoutSec = 100
>>>
>>> I've been following this blog: https://gist.github.com/mrflip/5958028
>>> to an extent, not everything word for word though.
>>>
>>> I need to be able to process around 66,000 tuples per second and I'm
>>> starting to run out of ideas.
>>>
>>> Thanks
>>>
>>> --
>>> Raphael Hsieh
>>>
>>>
>>>
>>
>>
>>
>
>
> --
> Raphael Hsieh
>
>
>
>



-- 
Data Systems Engineering
data-systems@groupon.com

Re: Optimizing Kafka Stream

Posted by Raphael Hsieh <ra...@gmail.com>.
Thanks for the tips Chi,
I'm a little confused about the partitioning. I had thought that the number
of partitions was determined by the amount of parallelism in the topology.
For example if I said .parallelismHint(4), then I would have 4 different
partitions. Is this not the case ?
Is there a set number of partitions my topology has that I need to increase
in order to have higher parallelism ?

Thanks


On Sat, May 31, 2014 at 11:50 AM, Chi Hoang <ch...@groupon.com> wrote:

> Raphael,
> You can try tuning your parallelism (and num workers).
>
> For Kafka 0.7, your spout parallelism could max out at: # brokers x #
> partitions (for the topic).  If you have 4 Kafka brokers, and your topic
> has 5 partitions, then you could set the spout parallelism to 20 to
> maximize the throughput.
>
> For Kafka 0.8+, your spout parallelism could max out at # partitions for
> the topic, so if your topic has 5 partitions, then you would set the spout
> parallelism to 5.  To increase parallelism, you would need to increase the
> number of partitions for your topic (by using the add partitions utility).
>
> As for the number of workers, setting it to 1 means that your spout will
> only run on a single Storm node, and would likely share resources with
> other Storm processes (spouts and bolts).  I recommend to increase the
> number of workers so Storm has a chance to spread out the work, and keep a
> good balance.
>
> Hope this helps.
>
> Chi
>
>
> On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh <ra...@gmail.com>
> wrote:
>
>> I am in the process of optimizing my stream. Currently I expect 5 000 000
>> tuples to come out of my spout per minute. I am trying to beef up my
>> topology in order to process this in real time without falling behind.
>>
>> For some reason my batch size is capping out at 83 thousand tuples. I
>> can't seem to make it any bigger. the processing time doesn't seem to get
>> any smaller than 2-3 seconds either.
>> I'm not sure how to configure the topology to get any faster / more
>> efficient.
>>
>> Currently all the topology does is a groupby on time and an aggregation
>> (Count) to aggregate everything.
>>
>> Here are some data points i've figured out.
>>
>> Batch Size:5mb
>> num-workers: 1
>> parallelismHint: 2
>> (I'll write this a 5mb, 1, 2)
>>
>> 5mb, 1, 2 = 83K tuples / 6s
>> 10mb, 1, 2 = 83k / 7s
>> 5mb, 1, 4 = 83k / 6s
>> 5mb, 2, 4 = 83k / 3s
>> 5mb, 3, 6 = 83k / 3s
>> 10mb, 3, 6 = 83k / 3s
>>
>> Can anybody help me figure out how to get it to process things faster ?
>>
>> My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
>> MessageTimeoutSec = 100
>>
>> I've been following this blog: https://gist.github.com/mrflip/5958028
>> to an extent, not everything word for word though.
>>
>> I need to be able to process around 66,000 tuples per second and I'm
>> starting to run out of ideas.
>>
>> Thanks
>>
>> --
>> Raphael Hsieh
>>
>>
>>
>
>
>


-- 
Raphael Hsieh

Re: Optimizing Kafka Stream

Posted by Chi Hoang <ch...@groupon.com>.
Raphael,
You can try tuning your parallelism (and num workers).

For Kafka 0.7, your spout parallelism could max out at: # brokers x #
partitions (for the topic).  If you have 4 Kafka brokers, and your topic
has 5 partitions, then you could set the spout parallelism to 20 to
maximize the throughput.

For Kafka 0.8+, your spout parallelism could max out at # partitions for
the topic, so if your topic has 5 partitions, then you would set the spout
parallelism to 5.  To increase parallelism, you would need to increase the
number of partitions for your topic (by using the add partitions utility).

As for the number of workers, setting it to 1 means that your spout will
only run on a single Storm node, and would likely share resources with
other Storm processes (spouts and bolts).  I recommend to increase the
number of workers so Storm has a chance to spread out the work, and keep a
good balance.

Hope this helps.

Chi


On Fri, May 30, 2014 at 4:24 PM, Raphael Hsieh <ra...@gmail.com> wrote:

> I am in the process of optimizing my stream. Currently I expect 5 000 000
> tuples to come out of my spout per minute. I am trying to beef up my
> topology in order to process this in real time without falling behind.
>
> For some reason my batch size is capping out at 83 thousand tuples. I
> can't seem to make it any bigger. the processing time doesn't seem to get
> any smaller than 2-3 seconds either.
> I'm not sure how to configure the topology to get any faster / more
> efficient.
>
> Currently all the topology does is a groupby on time and an aggregation
> (Count) to aggregate everything.
>
> Here are some data points i've figured out.
>
> Batch Size:5mb
> num-workers: 1
> parallelismHint: 2
> (I'll write this a 5mb, 1, 2)
>
> 5mb, 1, 2 = 83K tuples / 6s
> 10mb, 1, 2 = 83k / 7s
> 5mb, 1, 4 = 83k / 6s
> 5mb, 2, 4 = 83k / 3s
> 5mb, 3, 6 = 83k / 3s
> 10mb, 3, 6 = 83k / 3s
>
> Can anybody help me figure out how to get it to process things faster ?
>
> My maxSpoutPending is at 1, but when I increased it to 2 it was the same.
> MessageTimeoutSec = 100
>
> I've been following this blog: https://gist.github.com/mrflip/5958028
> to an extent, not everything word for word though.
>
> I need to be able to process around 66,000 tuples per second and I'm
> starting to run out of ideas.
>
> Thanks
>
> --
> Raphael Hsieh
>
>
>