You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Tyson Norris <tn...@adobe.com> on 2014/10/24 06:09:37 UTC

CustomStreamGrouping for load-based routing

Hi -
We are concerned with some loads accumulating on certain workers, and having that negatively impact the efficiency of the system.

Would a CustomStreamGrouping that is based on the broadcast of load stats be a way to deal with this?

For example, in our system, the content stream and processing overhead can be adjusted by external systems, creating stream with processing costs that vary per tuple.

We are currently using shuffleGrouping, which means that there is a chance that the most “expensive” tuples will land on the same task for processing, which would be bad (for those tuples) since other tasks might be underutilized by comparison.

Fieldgrouping isn’t helpful since it may also route loads disproportionately across the cluster.

So, I’m wondering if we can create a CustomStreamGrouping that will:
- periodically post worker stats (heap/cpu usage) + worker task assignments to zookeeper
- periodically read worker stats + task assignments of other workers from zookeeper
- during chooseTasks() impl, refer to the worker stats to determine which worker is least-loaded, and use the taskIds for that worker to route the tuple.


I haven’t tried this yet, but I know there is an issue (https://issues.apache.org/jira/browse/STORM-162) created a while back suggesting to add load balancing to shuffle grouping, and this seems like a simpler alternative.

Thanks
Tyson


Re: CustomStreamGrouping for load-based routing

Posted by Tyson Norris <tn...@adobe.com>.
We unfortunately cannot pre-determine which tuples will be expensive, so we can only use a single queue, so this would not work.

Thanks
Tyson

On Oct 24, 2014, at 4:04 AM, Itai Frenkel <It...@forter.com>> wrote:

I'm not a Storm expert but I'll give it a try.
What you are describing is a system that has high resource (CPU/memory/network) variance depending on the input. Usually shuffle grouping would be enough since given large enough number of events and small enough resource consumption per event it would even out. Clearly this is not your case, which means that there is relatively small number of events or very very high variance between the events. Here is a naive solution for this problem, not sure if it fits your requirements though:

1. Deploy each topology on a single machine.
2. Each topology(=machine) has a single spout. All spouts read from the same centralized queue service.
3. The queue service has two queues. One for "cheap" events, and one for "very very expensive" events.
4. The spout (by design in storm) decides if to answer nextTuple with an emit, or do nothing. It could use internal monitoring (such as codehale metrics/JMX/sigar) to regulate the throughput. It could decide to do nothing or to emit a cheap event, or to emit expensive event.

This (somewhat naive) solution does not require involving coordination through zookeeper and is easier to code.

Regards,
Itai

________________________________
From: Tyson Norris <tn...@adobe.com>>
Sent: Friday, October 24, 2014 7:09 AM
To: <us...@storm.incubator.apache.org>>
Subject: CustomStreamGrouping for load-based routing

Hi -
We are concerned with some loads accumulating on certain workers, and having that negatively impact the efficiency of the system.

Would a CustomStreamGrouping that is based on the broadcast of load stats be a way to deal with this?

For example, in our system, the content stream and processing overhead can be adjusted by external systems, creating stream with processing costs that vary per tuple.

We are currently using shuffleGrouping, which means that there is a chance that the most “expensive” tuples will land on the same task for processing, which would be bad (for those tuples) since other tasks might be underutilized by comparison.

Fieldgrouping isn’t helpful since it may also route loads disproportionately across the cluster.

So, I’m wondering if we can create a CustomStreamGrouping that will:
- periodically post worker stats (heap/cpu usage) + worker task assignments to zookeeper
- periodically read worker stats + task assignments of other workers from zookeeper
- during chooseTasks() impl, refer to the worker stats to determine which worker is least-loaded, and use the taskIds for that worker to route the tuple.


I haven’t tried this yet, but I know there is an issue (https://issues.apache.org/jira/browse/STORM-162) created a while back suggesting to add load balancing to shuffle grouping, and this seems like a simpler alternative.

Thanks
Tyson




Re: CustomStreamGrouping for load-based routing

Posted by Itai Frenkel <It...@forter.com>.
I'm not a Storm expert but I'll give it a try.

What you are describing is a system that has high resource (CPU/memory/network) variance depending on the input. Usually shuffle grouping would be enough since given large enough number of events and small enough resource consumption per event it would even out. Clearly this is not your case, which means that there is relatively small number of events or very very high variance between the events. Here is a naive solution for this problem, not sure if it fits your requirements though:


1. Deploy each topology on a single machine.

2. Each topology(=machine) has a single spout. All spouts read from the same centralized queue service.

3. The queue service has two queues. One for "cheap" events, and one for "very very expensive" events.

4. The spout (by design in storm) decides if to answer nextTuple with an emit, or do nothing. It could use internal monitoring (such as codehale metrics/JMX/sigar) to regulate the throughput. It could decide to do nothing or to emit a cheap event, or to emit expensive event.


This (somewhat naive) solution does not require involving coordination through zookeeper and is easier to code.


Regards,

Itai


________________________________
From: Tyson Norris <tn...@adobe.com>
Sent: Friday, October 24, 2014 7:09 AM
To: <us...@storm.incubator.apache.org>
Subject: CustomStreamGrouping for load-based routing

Hi -
We are concerned with some loads accumulating on certain workers, and having that negatively impact the efficiency of the system.

Would a CustomStreamGrouping that is based on the broadcast of load stats be a way to deal with this?

For example, in our system, the content stream and processing overhead can be adjusted by external systems, creating stream with processing costs that vary per tuple.

We are currently using shuffleGrouping, which means that there is a chance that the most "expensive" tuples will land on the same task for processing, which would be bad (for those tuples) since other tasks might be underutilized by comparison.

Fieldgrouping isn't helpful since it may also route loads disproportionately across the cluster.

So, I'm wondering if we can create a CustomStreamGrouping that will:
- periodically post worker stats (heap/cpu usage) + worker task assignments to zookeeper
- periodically read worker stats + task assignments of other workers from zookeeper
- during chooseTasks() impl, refer to the worker stats to determine which worker is least-loaded, and use the taskIds for that worker to route the tuple.


I haven't tried this yet, but I know there is an issue (https://issues.apache.org/jira/browse/STORM-162) created a while back suggesting to add load balancing to shuffle grouping, and this seems like a simpler alternative.

Thanks
Tyson