You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by 叶韵 <qi...@gmail.com> on 2022/06/29 12:37:24 UTC

[DISCUSS] PIP-181: Provide new load balance placement strategy implementation for ModularLoadManagerStrategy

Hi Pulsar community:
I open a pip to discuss "Shadow Topic, an alternative way to support
readonly topic ownership."
Proposal Link:

   - issue link: https://github.com/apache/pulsar/issues/16274
   - pr link: https://github.com/apache/pulsar/pull/16281

---
## Motivation
The modular load manager, implemented in `ModularLoadManagerImpl`, is a
flexible alternative to the previously implemented load manager, which
attempts to simplify how load is managed while also providing abstractions
so that complex load management strategies may be implemented.

The load management component determines the criteria for unloading bundles
and contains the following load shedding strategy: `OverloadShedder` and
`ThresholdShedder` and `UniformLoadShedder`. (default is
`ThresholdShedder`since 2.10.0)
- `OverloadShedder`: This strategy attempts to shed exactly one bundle on
brokers which are overloaded
- `ThresholdShedder`: This strategy unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold.
- `UniformLoadShedder`:This strategy tends to distribute load uniformly
across all brokers.

However, the bundle placement strategy contains only one:
`LeastLongTermMessageRate`, which selects a broker based on which one has
the least long term message rate.

The load management in our pulsar cluster use `ThresholdShedder` as load
shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
strategy, which does not work well.
Some broker nodes have a high load when the traffic of some topics are
relatively large. The load shedding strategy will unload some bundles in
any broker that exceeds the average resource utilization of all brokers by
a configured threshold. And the bundles will be transferred to the next
broker node. However it causes the load of the next broker node exceed the
average resource utilization. Therefore, the load balancing will occur
again on the current broker node due to high load. Worse yet, this scenario
keeps popping up.

The load shedding strategy configuration is as follows
```
# load shedding strategy, support OverloadShedder and ThresholdShedder,
default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder

# The broker resource usage threshold.
# When the broker resource usage is greater than the pulsar cluster average
resource usage,
# the threshold shedder will be triggered to offload bundles from the
broker.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=0.1

# The direct memory usage weight when calculating new resource usage.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=0.1

# Bundle unload minimum throughput threshold (MB), avoiding bundle unload
frequently.
# It only takes effect in the ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=0.1
```
The following screenshots are the status of the cluster:
Problem 1. Load balancing took a long time 10 hours and over 400 times, and
it has been unloading if there is a large traffic.
<img width="1247" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
">
<img width="1245" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
">

Problem 2. The effect of cpu balancing is poor.
<img width="1247" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
">
<img width="1246" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
">

The load shedding strategy `ThresholdShedder` work well, but not the bundle
placement strategyLeastLongTermMessageRate .
There are 3 possible reasons for the problems.
1. Although the cluster has many brokers with low load, there are fewer
brokers to be considered for assignment.
<img width="1168" alt="image" src="
https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
">

Some brokers with lower load but more bundles can not be candidate due to
distributing bundles evenly in LoadManager by force. Most of brokers are
filtered out by the strategy, only 1 or 2 brokers can be candidate in the
total 136 brokers . It was fixed by #16059

2. The memory usage of Java programs fluctuates widely, so that the maximum
resource usage calculated is based on memory usage most of the time, which
filters out brokers with low CPU load. Below is the sample of two brokers
jvm memory usage in the cluster.
<img width="1249" alt="image" src="
https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
">
If the broker is overload, it will get highest score, which prevents it
from being a candidate.
<img width="1059" alt="image" src="
https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
">

3. The bundle placement strategy is `LeastLongTermMessageRate`, which
selects a broker based on which one has the least long term message rate
instead of load metric. The `LeastLongTermMessageRate` does not get along
with `ThresholdShedder` well. Therefore, a load-based bundle placement
strategy is necessary to cooperate with `ThresholdShedder`.

### Current implementation details
The `ThresholdShedder` strategy that unloads any broker that exceeds the
average resource utilization of all brokers by a configured threshold. As a
consequence, this strategy tends to distribute load among all brokers. It
does this by first computing the average resource usage per broker for the
whole cluster. The resource usage for each broker is calculated using the
following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
weights for each resource are configurable. Historical observations are
included in the running average based on the broker's setting for
loadBalancerHistoryResourcePercentage. Once the average resource usage is
calculated, a broker's current/historical usage is compared to the average
broker usage. If a broker's usage is greater than the average usage per
broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
shedder proposes removing enough bundles to bring the unloaded broker 5%
below the current average broker usage. Note that recently unloaded bundles
are not unloaded again.

## Goal
Develop a new load-based bundle placement strategy for better load
balancing with fewer times, and less time, which cab achieve better
teamwork with `ThresholdShedder`.

## API Changes
No user-facing API changes are required.

## Implementation
This should be a detailed description of all the changes that are
expected to be made. It should be detailed enough that any developer that is
familiar with Pulsar internals would be able to understand all the parts of
the
code changes for this proposal.

This should also serve as documentation for any person that is trying to
understand or debug the behavior of a certain feature.

The main idea of the new strategy is to unify the requirement of load
shedding strategy and bundle placement strategy, which consider the
resource usage with weight, including historical observations.

How to calculate a score for a broker ?
- use its historical load and short-term load data with weight.

How to select a broker for assignning bundle ?
- select a broker based on which one has the least resource usage with
weight.

### New configuration options
The existing cache implementation will not be removed at this point. Users
will
be able to configure the old implementation in `broker.conf`.
This option will be helpful in case of performance regressions would be
seen for
some use cases with the new strategy implementation.
```
# load assignment strategy, support LeastLongTermMessageRate and
LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
```

Below are screenshots of the effect of the new strategy with less time and
fewer load balancing times.
<img width="1593" alt="image" src="
https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
">
<img width="1586" alt="image" src="
https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
">

## Reject Alternatives
None yet.

## Reference
[1] https://github.com/apache/pulsar/pull/16059
[2] https://github.com/apache/pulsar/issues/16274
[3] https://github.com/apache/pulsar/pull/16281

-- 
BR,
Qiang Huang

Re: [DISCUSS] PIP-181: Provide new load balance placement strategy implementation for ModularLoadManagerStrategy

Posted by Haiting Jiang <ji...@apache.org>.
PR #16319 does the similar thing. Just sync the info to this mail thread.

IMO, the real problem is the definition of broker load (aka resource usage) is not consistent in 
bundle shedder and bundle assignment. The load balance module should have a better 
abstraction that contains both shedding strategy and loading strategy for bundles, so that it 
can perform a clear balancing target, which can be message rate or cpu load or an abstract 
resource usage like `MaxResourceUsageWithWeight`.

[1] https://github.com/apache/pulsar/pull/16319

Thanks, 
Haiting

On 2022/06/29 12:37:24 叶韵 wrote:
> Hi Pulsar community:
> I open a pip to discuss "Shadow Topic, an alternative way to support
> readonly topic ownership."
> Proposal Link:
> 
>    - issue link: https://github.com/apache/pulsar/issues/16274
>    - pr link: https://github.com/apache/pulsar/pull/16281
> 
> ---
> ## Motivation
> The modular load manager, implemented in `ModularLoadManagerImpl`, is a
> flexible alternative to the previously implemented load manager, which
> attempts to simplify how load is managed while also providing abstractions
> so that complex load management strategies may be implemented.
> 
> The load management component determines the criteria for unloading bundles
> and contains the following load shedding strategy: `OverloadShedder` and
> `ThresholdShedder` and `UniformLoadShedder`. (default is
> `ThresholdShedder`since 2.10.0)
> - `OverloadShedder`: This strategy attempts to shed exactly one bundle on
> brokers which are overloaded
> - `ThresholdShedder`: This strategy unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold.
> - `UniformLoadShedder`:This strategy tends to distribute load uniformly
> across all brokers.
> 
> However, the bundle placement strategy contains only one:
> `LeastLongTermMessageRate`, which selects a broker based on which one has
> the least long term message rate.
> 
> The load management in our pulsar cluster use `ThresholdShedder` as load
> shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
> strategy, which does not work well.
> Some broker nodes have a high load when the traffic of some topics are
> relatively large. The load shedding strategy will unload some bundles in
> any broker that exceeds the average resource utilization of all brokers by
> a configured threshold. And the bundles will be transferred to the next
> broker node. However it causes the load of the next broker node exceed the
> average resource utilization. Therefore, the load balancing will occur
> again on the current broker node due to high load. Worse yet, this scenario
> keeps popping up.
> 
> The load shedding strategy configuration is as follows
> ```
> # load shedding strategy, support OverloadShedder and ThresholdShedder,
> default is OverloadShedder
> loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
> 
> # The broker resource usage threshold.
> # When the broker resource usage is greater than the pulsar cluster average
> resource usage,
> # the threshold shedder will be triggered to offload bundles from the
> broker.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBrokerThresholdShedderPercentage=10
> 
> # When calculating new resource usage, the history usage accounts for.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerHistoryResourcePercentage=0.9
> 
> # The BandWithIn usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithInResourceWeight=1.0
> 
> # The BandWithOut usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithOutResourceWeight=1.0
> 
> # The CPU usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerCPUResourceWeight=1.0
> 
> # The heap memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerMemoryResourceWeight=0.1
> 
> # The direct memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerDirectMemoryResourceWeight=0.1
> 
> # Bundle unload minimum throughput threshold (MB), avoiding bundle unload
> frequently.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBundleUnloadMinThroughputThreshold=0.1
> ```
> The following screenshots are the status of the cluster:
> Problem 1. Load balancing took a long time 10 hours and over 400 times, and
> it has been unloading if there is a large traffic.
> <img width="1247" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
> ">
> <img width="1245" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
> ">
> 
> Problem 2. The effect of cpu balancing is poor.
> <img width="1247" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
> ">
> <img width="1246" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
> ">
> 
> The load shedding strategy `ThresholdShedder` work well, but not the bundle
> placement strategyLeastLongTermMessageRate .
> There are 3 possible reasons for the problems.
> 1. Although the cluster has many brokers with low load, there are fewer
> brokers to be considered for assignment.
> <img width="1168" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
> ">
> 
> Some brokers with lower load but more bundles can not be candidate due to
> distributing bundles evenly in LoadManager by force. Most of brokers are
> filtered out by the strategy, only 1 or 2 brokers can be candidate in the
> total 136 brokers . It was fixed by #16059
> 
> 2. The memory usage of Java programs fluctuates widely, so that the maximum
> resource usage calculated is based on memory usage most of the time, which
> filters out brokers with low CPU load. Below is the sample of two brokers
> jvm memory usage in the cluster.
> <img width="1249" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
> ">
> If the broker is overload, it will get highest score, which prevents it
> from being a candidate.
> <img width="1059" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
> ">
> 
> 3. The bundle placement strategy is `LeastLongTermMessageRate`, which
> selects a broker based on which one has the least long term message rate
> instead of load metric. The `LeastLongTermMessageRate` does not get along
> with `ThresholdShedder` well. Therefore, a load-based bundle placement
> strategy is necessary to cooperate with `ThresholdShedder`.
> 
> ### Current implementation details
> The `ThresholdShedder` strategy that unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold. As a
> consequence, this strategy tends to distribute load among all brokers. It
> does this by first computing the average resource usage per broker for the
> whole cluster. The resource usage for each broker is calculated using the
> following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
> weights for each resource are configurable. Historical observations are
> included in the running average based on the broker's setting for
> loadBalancerHistoryResourcePercentage. Once the average resource usage is
> calculated, a broker's current/historical usage is compared to the average
> broker usage. If a broker's usage is greater than the average usage per
> broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
> shedder proposes removing enough bundles to bring the unloaded broker 5%
> below the current average broker usage. Note that recently unloaded bundles
> are not unloaded again.
> 
> ## Goal
> Develop a new load-based bundle placement strategy for better load
> balancing with fewer times, and less time, which cab achieve better
> teamwork with `ThresholdShedder`.
> 
> ## API Changes
> No user-facing API changes are required.
> 
> ## Implementation
> This should be a detailed description of all the changes that are
> expected to be made. It should be detailed enough that any developer that is
> familiar with Pulsar internals would be able to understand all the parts of
> the
> code changes for this proposal.
> 
> This should also serve as documentation for any person that is trying to
> understand or debug the behavior of a certain feature.
> 
> The main idea of the new strategy is to unify the requirement of load
> shedding strategy and bundle placement strategy, which consider the
> resource usage with weight, including historical observations.
> 
> How to calculate a score for a broker ?
> - use its historical load and short-term load data with weight.
> 
> How to select a broker for assignning bundle ?
> - select a broker based on which one has the least resource usage with
> weight.
> 
> ### New configuration options
> The existing cache implementation will not be removed at this point. Users
> will
> be able to configure the old implementation in `broker.conf`.
> This option will be helpful in case of performance regressions would be
> seen for
> some use cases with the new strategy implementation.
> ```
> # load assignment strategy, support LeastLongTermMessageRate and
> LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
> loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
> ```
> 
> Below are screenshots of the effect of the new strategy with less time and
> fewer load balancing times.
> <img width="1593" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
> ">
> <img width="1586" alt="image" src="
> https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
> ">
> 
> ## Reject Alternatives
> None yet.
> 
> ## Reference
> [1] https://github.com/apache/pulsar/pull/16059
> [2] https://github.com/apache/pulsar/issues/16274
> [3] https://github.com/apache/pulsar/pull/16281
> 
> -- 
> BR,
> Qiang Huang
> 

Re: [DISCUSS] PIP-181: Provide new load balance placement strategy implementation for ModularLoadManagerStrategy

Posted by Qiang Huang <qi...@gmail.com>.
Thank you. I resolved. PTAL.

Heesung Sohn <he...@streamnative.io.invalid> 于2022年7月1日周五 02:35写道:

> Hi 叶韵,
> The proposal looks good to me, and I left comments in the PR,
> https://github.com/apache/pulsar/pull/16281/.
>
> Regards,
> Heesung
>
> On Wed, Jun 29, 2022 at 5:39 AM 叶韵 <qi...@gmail.com> wrote:
>
> > Hi Pulsar community:
> > I open a pip to discuss "Shadow Topic, an alternative way to support
> > readonly topic ownership."
> > Proposal Link:
> >
> >    - issue link: https://github.com/apache/pulsar/issues/16274
> >    - pr link: https://github.com/apache/pulsar/pull/16281
> >
> > ---
> > ## Motivation
> > The modular load manager, implemented in `ModularLoadManagerImpl`, is a
> > flexible alternative to the previously implemented load manager, which
> > attempts to simplify how load is managed while also providing
> abstractions
> > so that complex load management strategies may be implemented.
> >
> > The load management component determines the criteria for unloading
> bundles
> > and contains the following load shedding strategy: `OverloadShedder` and
> > `ThresholdShedder` and `UniformLoadShedder`. (default is
> > `ThresholdShedder`since 2.10.0)
> > - `OverloadShedder`: This strategy attempts to shed exactly one bundle on
> > brokers which are overloaded
> > - `ThresholdShedder`: This strategy unloads any broker that exceeds the
> > average resource utilization of all brokers by a configured threshold.
> > - `UniformLoadShedder`:This strategy tends to distribute load uniformly
> > across all brokers.
> >
> > However, the bundle placement strategy contains only one:
> > `LeastLongTermMessageRate`, which selects a broker based on which one has
> > the least long term message rate.
> >
> > The load management in our pulsar cluster use `ThresholdShedder` as load
> > shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
> > strategy, which does not work well.
> > Some broker nodes have a high load when the traffic of some topics are
> > relatively large. The load shedding strategy will unload some bundles in
> > any broker that exceeds the average resource utilization of all brokers
> by
> > a configured threshold. And the bundles will be transferred to the next
> > broker node. However it causes the load of the next broker node exceed
> the
> > average resource utilization. Therefore, the load balancing will occur
> > again on the current broker node due to high load. Worse yet, this
> scenario
> > keeps popping up.
> >
> > The load shedding strategy configuration is as follows
> > ```
> > # load shedding strategy, support OverloadShedder and ThresholdShedder,
> > default is OverloadShedder
> >
> >
> loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
> >
> > # The broker resource usage threshold.
> > # When the broker resource usage is greater than the pulsar cluster
> average
> > resource usage,
> > # the threshold shedder will be triggered to offload bundles from the
> > broker.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerBrokerThresholdShedderPercentage=10
> >
> > # When calculating new resource usage, the history usage accounts for.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerHistoryResourcePercentage=0.9
> >
> > # The BandWithIn usage weight when calculating new resource usage.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerBandwithInResourceWeight=1.0
> >
> > # The BandWithOut usage weight when calculating new resource usage.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerBandwithOutResourceWeight=1.0
> >
> > # The CPU usage weight when calculating new resource usage.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerCPUResourceWeight=1.0
> >
> > # The heap memory usage weight when calculating new resource usage.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerMemoryResourceWeight=0.1
> >
> > # The direct memory usage weight when calculating new resource usage.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerDirectMemoryResourceWeight=0.1
> >
> > # Bundle unload minimum throughput threshold (MB), avoiding bundle unload
> > frequently.
> > # It only takes effect in the ThresholdShedder strategy.
> > loadBalancerBundleUnloadMinThroughputThreshold=0.1
> > ```
> > The following screenshots are the status of the cluster:
> > Problem 1. Load balancing took a long time 10 hours and over 400 times,
> and
> > it has been unloading if there is a large traffic.
> > <img width="1247" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
> > ">
> > <img width="1245" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
> > ">
> >
> > Problem 2. The effect of cpu balancing is poor.
> > <img width="1247" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
> > ">
> > <img width="1246" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
> > ">
> >
> > The load shedding strategy `ThresholdShedder` work well, but not the
> bundle
> > placement strategyLeastLongTermMessageRate .
> > There are 3 possible reasons for the problems.
> > 1. Although the cluster has many brokers with low load, there are fewer
> > brokers to be considered for assignment.
> > <img width="1168" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
> > ">
> >
> > Some brokers with lower load but more bundles can not be candidate due to
> > distributing bundles evenly in LoadManager by force. Most of brokers are
> > filtered out by the strategy, only 1 or 2 brokers can be candidate in the
> > total 136 brokers . It was fixed by #16059
> >
> > 2. The memory usage of Java programs fluctuates widely, so that the
> maximum
> > resource usage calculated is based on memory usage most of the time,
> which
> > filters out brokers with low CPU load. Below is the sample of two brokers
> > jvm memory usage in the cluster.
> > <img width="1249" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
> > ">
> > If the broker is overload, it will get highest score, which prevents it
> > from being a candidate.
> > <img width="1059" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
> > ">
> >
> > 3. The bundle placement strategy is `LeastLongTermMessageRate`, which
> > selects a broker based on which one has the least long term message rate
> > instead of load metric. The `LeastLongTermMessageRate` does not get along
> > with `ThresholdShedder` well. Therefore, a load-based bundle placement
> > strategy is necessary to cooperate with `ThresholdShedder`.
> >
> > ### Current implementation details
> > The `ThresholdShedder` strategy that unloads any broker that exceeds the
> > average resource utilization of all brokers by a configured threshold.
> As a
> > consequence, this strategy tends to distribute load among all brokers. It
> > does this by first computing the average resource usage per broker for
> the
> > whole cluster. The resource usage for each broker is calculated using the
> > following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
> > weights for each resource are configurable. Historical observations are
> > included in the running average based on the broker's setting for
> > loadBalancerHistoryResourcePercentage. Once the average resource usage is
> > calculated, a broker's current/historical usage is compared to the
> average
> > broker usage. If a broker's usage is greater than the average usage per
> > broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
> > shedder proposes removing enough bundles to bring the unloaded broker 5%
> > below the current average broker usage. Note that recently unloaded
> bundles
> > are not unloaded again.
> >
> > ## Goal
> > Develop a new load-based bundle placement strategy for better load
> > balancing with fewer times, and less time, which cab achieve better
> > teamwork with `ThresholdShedder`.
> >
> > ## API Changes
> > No user-facing API changes are required.
> >
> > ## Implementation
> > This should be a detailed description of all the changes that are
> > expected to be made. It should be detailed enough that any developer that
> > is
> > familiar with Pulsar internals would be able to understand all the parts
> of
> > the
> > code changes for this proposal.
> >
> > This should also serve as documentation for any person that is trying to
> > understand or debug the behavior of a certain feature.
> >
> > The main idea of the new strategy is to unify the requirement of load
> > shedding strategy and bundle placement strategy, which consider the
> > resource usage with weight, including historical observations.
> >
> > How to calculate a score for a broker ?
> > - use its historical load and short-term load data with weight.
> >
> > How to select a broker for assignning bundle ?
> > - select a broker based on which one has the least resource usage with
> > weight.
> >
> > ### New configuration options
> > The existing cache implementation will not be removed at this point.
> Users
> > will
> > be able to configure the old implementation in `broker.conf`.
> > This option will be helpful in case of performance regressions would be
> > seen for
> > some use cases with the new strategy implementation.
> > ```
> > # load assignment strategy, support LeastLongTermMessageRate and
> > LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
> >
> >
> loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
> > ```
> >
> > Below are screenshots of the effect of the new strategy with less time
> and
> > fewer load balancing times.
> > <img width="1593" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
> > ">
> > <img width="1586" alt="image" src="
> >
> >
> https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
> > ">
> >
> > ## Reject Alternatives
> > None yet.
> >
> > ## Reference
> > [1] https://github.com/apache/pulsar/pull/16059
> > [2] https://github.com/apache/pulsar/issues/16274
> > [3] https://github.com/apache/pulsar/pull/16281
> >
> > --
> > BR,
> > Qiang Huang
> >
>


-- 
BR,
Qiang Huang

Re: [DISCUSS] PIP-181: Provide new load balance placement strategy implementation for ModularLoadManagerStrategy

Posted by Heesung Sohn <he...@streamnative.io.INVALID>.
Hi 叶韵,
The proposal looks good to me, and I left comments in the PR,
https://github.com/apache/pulsar/pull/16281/.

Regards,
Heesung

On Wed, Jun 29, 2022 at 5:39 AM 叶韵 <qi...@gmail.com> wrote:

> Hi Pulsar community:
> I open a pip to discuss "Shadow Topic, an alternative way to support
> readonly topic ownership."
> Proposal Link:
>
>    - issue link: https://github.com/apache/pulsar/issues/16274
>    - pr link: https://github.com/apache/pulsar/pull/16281
>
> ---
> ## Motivation
> The modular load manager, implemented in `ModularLoadManagerImpl`, is a
> flexible alternative to the previously implemented load manager, which
> attempts to simplify how load is managed while also providing abstractions
> so that complex load management strategies may be implemented.
>
> The load management component determines the criteria for unloading bundles
> and contains the following load shedding strategy: `OverloadShedder` and
> `ThresholdShedder` and `UniformLoadShedder`. (default is
> `ThresholdShedder`since 2.10.0)
> - `OverloadShedder`: This strategy attempts to shed exactly one bundle on
> brokers which are overloaded
> - `ThresholdShedder`: This strategy unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold.
> - `UniformLoadShedder`:This strategy tends to distribute load uniformly
> across all brokers.
>
> However, the bundle placement strategy contains only one:
> `LeastLongTermMessageRate`, which selects a broker based on which one has
> the least long term message rate.
>
> The load management in our pulsar cluster use `ThresholdShedder` as load
> shedding strategy, and use `LeastLongTermMessageRate` as bundle placement
> strategy, which does not work well.
> Some broker nodes have a high load when the traffic of some topics are
> relatively large. The load shedding strategy will unload some bundles in
> any broker that exceeds the average resource utilization of all brokers by
> a configured threshold. And the bundles will be transferred to the next
> broker node. However it causes the load of the next broker node exceed the
> average resource utilization. Therefore, the load balancing will occur
> again on the current broker node due to high load. Worse yet, this scenario
> keeps popping up.
>
> The load shedding strategy configuration is as follows
> ```
> # load shedding strategy, support OverloadShedder and ThresholdShedder,
> default is OverloadShedder
>
> loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
>
> # The broker resource usage threshold.
> # When the broker resource usage is greater than the pulsar cluster average
> resource usage,
> # the threshold shedder will be triggered to offload bundles from the
> broker.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBrokerThresholdShedderPercentage=10
>
> # When calculating new resource usage, the history usage accounts for.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerHistoryResourcePercentage=0.9
>
> # The BandWithIn usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithInResourceWeight=1.0
>
> # The BandWithOut usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBandwithOutResourceWeight=1.0
>
> # The CPU usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerCPUResourceWeight=1.0
>
> # The heap memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerMemoryResourceWeight=0.1
>
> # The direct memory usage weight when calculating new resource usage.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerDirectMemoryResourceWeight=0.1
>
> # Bundle unload minimum throughput threshold (MB), avoiding bundle unload
> frequently.
> # It only takes effect in the ThresholdShedder strategy.
> loadBalancerBundleUnloadMinThroughputThreshold=0.1
> ```
> The following screenshots are the status of the cluster:
> Problem 1. Load balancing took a long time 10 hours and over 400 times, and
> it has been unloading if there is a large traffic.
> <img width="1247" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341641-b85f8258-e973-4b14-8875-16be573dcbda.png
> ">
> <img width="1245" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341676-ed81b465-10fc-4051-8353-42e6306d4210.png
> ">
>
> Problem 2. The effect of cpu balancing is poor.
> <img width="1247" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341746-d3b28234-11ef-48c4-9f91-2fdf7bcde74b.png
> ">
> <img width="1246" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341792-b77a0691-b402-4fa0-a7aa-ac15c890613a.png
> ">
>
> The load shedding strategy `ThresholdShedder` work well, but not the bundle
> placement strategyLeastLongTermMessageRate .
> There are 3 possible reasons for the problems.
> 1. Although the cluster has many brokers with low load, there are fewer
> brokers to be considered for assignment.
> <img width="1168" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176341873-6da69749-3c1d-49cf-9e83-b942a8327db0.png
> ">
>
> Some brokers with lower load but more bundles can not be candidate due to
> distributing bundles evenly in LoadManager by force. Most of brokers are
> filtered out by the strategy, only 1 or 2 brokers can be candidate in the
> total 136 brokers . It was fixed by #16059
>
> 2. The memory usage of Java programs fluctuates widely, so that the maximum
> resource usage calculated is based on memory usage most of the time, which
> filters out brokers with low CPU load. Below is the sample of two brokers
> jvm memory usage in the cluster.
> <img width="1249" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176342043-f88f875d-5479-4132-a3f1-f9c053f3b7cb.png
> ">
> If the broker is overload, it will get highest score, which prevents it
> from being a candidate.
> <img width="1059" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176342107-179489e9-40b3-47b9-8158-f0e30fc037e4.png
> ">
>
> 3. The bundle placement strategy is `LeastLongTermMessageRate`, which
> selects a broker based on which one has the least long term message rate
> instead of load metric. The `LeastLongTermMessageRate` does not get along
> with `ThresholdShedder` well. Therefore, a load-based bundle placement
> strategy is necessary to cooperate with `ThresholdShedder`.
>
> ### Current implementation details
> The `ThresholdShedder` strategy that unloads any broker that exceeds the
> average resource utilization of all brokers by a configured threshold. As a
> consequence, this strategy tends to distribute load among all brokers. It
> does this by first computing the average resource usage per broker for the
> whole cluster. The resource usage for each broker is calculated using the
> following method: `LocalBrokerData#getMaxResourceUsageWithWeight`). The
> weights for each resource are configurable. Historical observations are
> included in the running average based on the broker's setting for
> loadBalancerHistoryResourcePercentage. Once the average resource usage is
> calculated, a broker's current/historical usage is compared to the average
> broker usage. If a broker's usage is greater than the average usage per
> broker plus the loadBalancerBrokerThresholdShedderPercentage, this load
> shedder proposes removing enough bundles to bring the unloaded broker 5%
> below the current average broker usage. Note that recently unloaded bundles
> are not unloaded again.
>
> ## Goal
> Develop a new load-based bundle placement strategy for better load
> balancing with fewer times, and less time, which cab achieve better
> teamwork with `ThresholdShedder`.
>
> ## API Changes
> No user-facing API changes are required.
>
> ## Implementation
> This should be a detailed description of all the changes that are
> expected to be made. It should be detailed enough that any developer that
> is
> familiar with Pulsar internals would be able to understand all the parts of
> the
> code changes for this proposal.
>
> This should also serve as documentation for any person that is trying to
> understand or debug the behavior of a certain feature.
>
> The main idea of the new strategy is to unify the requirement of load
> shedding strategy and bundle placement strategy, which consider the
> resource usage with weight, including historical observations.
>
> How to calculate a score for a broker ?
> - use its historical load and short-term load data with weight.
>
> How to select a broker for assignning bundle ?
> - select a broker based on which one has the least resource usage with
> weight.
>
> ### New configuration options
> The existing cache implementation will not be removed at this point. Users
> will
> be able to configure the old implementation in `broker.conf`.
> This option will be helpful in case of performance regressions would be
> seen for
> some use cases with the new strategy implementation.
> ```
> # load assignment strategy, support LeastLongTermMessageRate and
> LeastResourceUsageWithWeight, default is LeastLongTermMessageRate
>
> loadBalancerLoadAssignmentStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight
> ```
>
> Below are screenshots of the effect of the new strategy with less time and
> fewer load balancing times.
> <img width="1593" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176346492-f2ccdfda-b011-406d-88fe-df73d8bb839b.png
> ">
> <img width="1586" alt="image" src="
>
> https://user-images.githubusercontent.com/4970972/176346531-63a9b8b0-ef7b-4f74-a904-37d7c07c1793.png
> ">
>
> ## Reject Alternatives
> None yet.
>
> ## Reference
> [1] https://github.com/apache/pulsar/pull/16059
> [2] https://github.com/apache/pulsar/issues/16274
> [3] https://github.com/apache/pulsar/pull/16281
>
> --
> BR,
> Qiang Huang
>