You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by lordcheng10 <15...@qq.com.INVALID> on 2022/08/22 07:53:04 UTC

回复:[Discuss][PIP-164] Support split bundle by flow or qps

The implementation logic has been modified, and the corresponding example has also been modified as follows:


## Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide


However, none of these algorithms can divide bundles according to flow or qps, which may cause bundles to be split multiple times.


## Goal
Our goal is to split bundles according to flow or QPS, so we propose a PIP to introduce a split algorithm based on flow or QPS.
The main idea is that we can get the flow or qps information of a topic contained in a bundle,
then split according to loadBalancerNamespaceBundleMaxMsgRate or loadBalancerNamespaceBundleMaxBandwidthMbytes configuration


For example, there is bundle with boundaries 0x00000000 to 0x00000200, and six topics : t1 , t2 , t3 , t4, t5, t6.
loadBalancerNamespaceBundleMaxMsgRate=1100
loadBalancerNamespaceBundleMaxBandwidthMbytes=110


**Step 1: Get their hash position and corresponding flow and QPS:**


&gt; t1 with hashcode 10 msgRate 100/s throughput 10M/s
&gt;
&gt; t2 with hashcode 20 msgRate 200/s throughput 20M/s
&gt;
&gt; t3 with hashcode 80 msgRate 300/s throughput 30M/s
&gt;
&gt; t4 with hashcode 90 msgRate 400/s throughput 40M/s
&gt;
&gt; t5 with hashcode 100 msgRate 500/s throughput 50M/s
&gt;
&gt; t6 with hashcode 110 msgRate 600/s throughput 60M/s




**Step 2: Calculate the total flow and qps of the bundle:**


&gt; bundleMsgRate = 100 + 200 + 300 + 400 + 500 + 600 = 2100
&gt; bundleThroughput = 10 + 20 + 30 + 40 + 50 + 60 = 210MB


**Step 3: Calculate the position to split and split:**


&gt; QPS: (100 + 200 + 300 + 400 ) < loadBalancerNamespaceBundleMaxMsgRate=1100 &amp; (100+200+300+400+500) &gt; loadBalancerNamespaceBundleMaxMsgRate=1100
&gt; flow: (10 + 20 + 30 + 40 ) < loadBalancerNamespaceBundleMaxBandwidthMbytes=110 &amp; (10 + 20 + 30 + 40 + 50 ) &gt; loadBalancerNamespaceBundleMaxBandwidthMbytes=110
&gt;
&gt; Split between t4 and t5:
&gt; splitStartPosition = 90
&gt; splitEndPosition = 100
&gt; splitPosition = (90 + 100) / 2 = 95




## API Changes


1. Add a split algorithm class based on flow or qps:


`public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm `


2. update the default configuration:
```
private List<String&gt; supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
&nbsp; &nbsp;"specified_positions_divide", "flow_count_equally_divide");
```
3. added configuration
```
@FieldContext(
&nbsp; &nbsp; &nbsp; &nbsp; dynamic = true,
&nbsp; &nbsp; &nbsp; &nbsp; category = CATEGORY_LOAD_BALANCER,
&nbsp; &nbsp; &nbsp; &nbsp; doc = "Acceptable difference between qps and loadBalancerNamespaceBundleMaxMsgRate "
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + " or flow and loadBalancerNamespaceBundleMaxBandwidthMbytes "
)
private int flowOrQpsDifferenceThresholdPercentage = 10;
```




## Implementation
The execution steps of the FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as follows:
1. Get the hash position of each topic and the corresponding msgRate and msgThroughput, and sort them according to the position size:


```
List<Long&gt; topicNameHashList = new ArrayList<&gt;(topics.size());
Map<Long, Double&gt; hashAndMsgMap = new HashMap<&gt;();
Map<Long, Double&gt; hashAndThroughput = new HashMap<&gt;();
```


2. According to the topic hash position, traverse all topics from small to large,
and split the bundle according to the configured loadBalancerNamespaceBundleMaxMsgRate or loadBalancerNamespaceBundleMaxBandwidthMbytes:


```
&nbsp; double bundleMsgRateTmp = 0;
&nbsp; double bundleThroughputTmp = 0;
&nbsp; for (int i = 0; i < topicNameHashList.size(); i++) {
&nbsp; &nbsp; &nbsp; long topicHashCode = topicNameHashList.get(i);
&nbsp; &nbsp; &nbsp; bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
&nbsp; &nbsp; &nbsp; bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);


&nbsp; &nbsp; &nbsp; if (bundleMsgRateTmp &gt; loadBalancerNamespaceBundleMaxMsgRate
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; || bundleThroughputTmp &gt; loadBalancerNamespaceBundleMaxBandwidthBytes) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitStart = i &gt; 0 ? topicNameHashList.get(i - 1) : topicHashCode;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitEnd = i &gt; 0 ? topicHashCode : topicNameHashList.get(i + 1);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; splitResults.add(splitMiddle);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleMsgRateTmp =&nbsp; hashAndMsgMap.get(topicHashCode);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleThroughputTmp = hashAndThroughput.get(topicHashCode);
&nbsp; &nbsp; &nbsp; }
&nbsp; }


```

Re: [Discuss][PIP-164] Support split bundle by flow or qps

Posted by PengHui Li <pe...@apache.org>.
It's a good idea to improve the bundle split for the case that the traffic
of the topic doesn't change drastically
Otherwise, we should not use this policy. or can we use it for all cases?
I think It should be documented in the proposal.

I have some questions

- do we need to consider the consumer rate
- the `flow or qps` is based on the entries or messages?
  IMO, based on the entries and throughput is more reasonable. Usually, the
entry rate affects the CPU usage, the throughput affects the network usage.

Thanks,
Penghui

On Mon, Aug 22, 2022 at 3:53 PM lordcheng10 <15...@qq.com.invalid>
wrote:

> The implementation logic has been modified, and the corresponding example
> has also been modified as follows:
>
>
> ## Motivation
> As we all know, Bundle split has 3 algorithms:
> - range_equally_divide
> - topic_count_equally_divide
> - specified_positions_divide
>
>
> However, none of these algorithms can divide bundles according to flow or
> qps, which may cause bundles to be split multiple times.
>
>
> ## Goal
> Our goal is to split bundles according to flow or QPS, so we propose a PIP
> to introduce a split algorithm based on flow or QPS.
> The main idea is that we can get the flow or qps information of a topic
> contained in a bundle,
> then split according to loadBalancerNamespaceBundleMaxMsgRate or
> loadBalancerNamespaceBundleMaxBandwidthMbytes configuration
>
>
> For example, there is bundle with boundaries 0x00000000 to 0x00000200, and
> six topics : t1 , t2 , t3 , t4, t5, t6.
> loadBalancerNamespaceBundleMaxMsgRate=1100
> loadBalancerNamespaceBundleMaxBandwidthMbytes=110
>
>
> **Step 1: Get their hash position and corresponding flow and QPS:**
>
>
> &gt; t1 with hashcode 10 msgRate 100/s throughput 10M/s
> &gt;
> &gt; t2 with hashcode 20 msgRate 200/s throughput 20M/s
> &gt;
> &gt; t3 with hashcode 80 msgRate 300/s throughput 30M/s
> &gt;
> &gt; t4 with hashcode 90 msgRate 400/s throughput 40M/s
> &gt;
> &gt; t5 with hashcode 100 msgRate 500/s throughput 50M/s
> &gt;
> &gt; t6 with hashcode 110 msgRate 600/s throughput 60M/s
>
>
>
>
> **Step 2: Calculate the total flow and qps of the bundle:**
>
>
> &gt; bundleMsgRate = 100 + 200 + 300 + 400 + 500 + 600 = 2100
> &gt; bundleThroughput = 10 + 20 + 30 + 40 + 50 + 60 = 210MB
>
>
> **Step 3: Calculate the position to split and split:**
>
>
> &gt; QPS: (100 + 200 + 300 + 400 ) <
> loadBalancerNamespaceBundleMaxMsgRate=1100 &amp; (100+200+300+400+500) &gt;
> loadBalancerNamespaceBundleMaxMsgRate=1100
> &gt; flow: (10 + 20 + 30 + 40 ) <
> loadBalancerNamespaceBundleMaxBandwidthMbytes=110 &amp; (10 + 20 + 30 + 40
> + 50 ) &gt; loadBalancerNamespaceBundleMaxBandwidthMbytes=110
> &gt;
> &gt; Split between t4 and t5:
> &gt; splitStartPosition = 90
> &gt; splitEndPosition = 100
> &gt; splitPosition = (90 + 100) / 2 = 95
>
>
>
>
> ## API Changes
>
>
> 1. Add a split algorithm class based on flow or qps:
>
>
> `public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements
> NamespaceBundleSplitAlgorithm `
>
>
> 2. update the default configuration:
> ```
> private List<String&gt; supportedNamespaceBundleSplitAlgorithms =
> Lists.newArrayList("range_equally_divide", "topic_count_equally_divide",
> &nbsp; &nbsp;"specified_positions_divide", "flow_count_equally_divide");
> ```
> 3. added configuration
> ```
> @FieldContext(
> &nbsp; &nbsp; &nbsp; &nbsp; dynamic = true,
> &nbsp; &nbsp; &nbsp; &nbsp; category = CATEGORY_LOAD_BALANCER,
> &nbsp; &nbsp; &nbsp; &nbsp; doc = "Acceptable difference between qps and
> loadBalancerNamespaceBundleMaxMsgRate "
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; + " or flow and
> loadBalancerNamespaceBundleMaxBandwidthMbytes "
> )
> private int flowOrQpsDifferenceThresholdPercentage = 10;
> ```
>
>
>
>
> ## Implementation
> The execution steps of the
> FlowOrQpsEquallyDivideBundleSplitAlgorithm#getSplitBoundary method are as
> follows:
> 1. Get the hash position of each topic and the corresponding msgRate and
> msgThroughput, and sort them according to the position size:
>
>
> ```
> List<Long&gt; topicNameHashList = new ArrayList<&gt;(topics.size());
> Map<Long, Double&gt; hashAndMsgMap = new HashMap<&gt;();
> Map<Long, Double&gt; hashAndThroughput = new HashMap<&gt;();
> ```
>
>
> 2. According to the topic hash position, traverse all topics from small to
> large,
> and split the bundle according to the configured
> loadBalancerNamespaceBundleMaxMsgRate or
> loadBalancerNamespaceBundleMaxBandwidthMbytes:
>
>
> ```
> &nbsp; double bundleMsgRateTmp = 0;
> &nbsp; double bundleThroughputTmp = 0;
> &nbsp; for (int i = 0; i < topicNameHashList.size(); i++) {
> &nbsp; &nbsp; &nbsp; long topicHashCode = topicNameHashList.get(i);
> &nbsp; &nbsp; &nbsp; bundleThroughputTmp +=
> hashAndThroughput.get(topicHashCode);
> &nbsp; &nbsp; &nbsp; bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
>
>
> &nbsp; &nbsp; &nbsp; if (bundleMsgRateTmp &gt;
> loadBalancerNamespaceBundleMaxMsgRate
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; || bundleThroughputTmp &gt;
> loadBalancerNamespaceBundleMaxBandwidthBytes) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitStart = i &gt; 0 ?
> topicNameHashList.get(i - 1) : topicHashCode;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitEnd = i &gt; 0 ?
> topicHashCode : topicNameHashList.get(i + 1);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long splitMiddle = splitStart +
> (splitEnd - splitStart) / 2;
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; splitResults.add(splitMiddle);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleMsgRateTmp =&nbsp;
> hashAndMsgMap.get(topicHashCode);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; bundleThroughputTmp =
> hashAndThroughput.get(topicHashCode);
> &nbsp; &nbsp; &nbsp; }
> &nbsp; }
>
>
> ```