You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/05 14:25:06 UTC

[pulsar] branch master updated: [PIP-169] Support split bundle by flow or qps (#16557)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eb16cf02c5 [PIP-169] Support split bundle by flow or qps (#16557)
8eb16cf02c5 is described below

commit 8eb16cf02c5b396554ba812de15db8911907695a
Author: LinChen <15...@qq.com>
AuthorDate: Mon Sep 5 22:24:56 2022 +0800

    [PIP-169] Support split bundle by flow or qps (#16557)
    
    Fixes https://github.com/apache/pulsar/issues/16782
    
    ### Motivation
    As we all know, Bundle split has 3 algorithms:
    
    - range_equally_divide
    - topic_count_equally_divide
    - specified_positions_divide
    
    However, none of these algorithms can divide bundles according to flow or qps, which may cause bundles to be split multiple times.
---
 conf/broker.conf                                   |   2 +-
 deployment/terraform-ansible/templates/broker.conf |   2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  15 +-
 .../pulsar/broker/namespace/NamespaceService.java  |  16 +-
 .../pulsar/broker/service/BrokerService.java       |  17 ++
 ...FlowOrQpsEquallyDivideBundleSplitAlgorithm.java | 129 ++++++++++++++
 .../FlowOrQpsEquallyDivideBundleSplitOption.java   |  46 +++++
 .../naming/NamespaceBundleSplitAlgorithm.java      |   6 +-
 ...OrQpsEquallyDivideBundleSplitAlgorithmTest.java | 194 +++++++++++++++++++++
 .../naming/NamespaceBundleSplitAlgorithmTest.java  |   7 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |   3 +-
 11 files changed, 428 insertions(+), 9 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 6f51c96a008..67655674203 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1225,7 +1225,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
 # Supported algorithms name for namespace bundle split.
 # "range_equally_divide" divides the bundle into two parts with the same hash range size.
 # "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
-supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
+supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide,flow_or_qps_equally_divide
 
 # Default algorithm name for namespace bundle split
 defaultNamespaceBundleSplitAlgorithm=range_equally_divide
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index 5075b600b25..eb81a284e73 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -931,7 +931,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
 # Supported algorithms name for namespace bundle split.
 # "range_equally_divide" divides the bundle into two parts with the same hash range size.
 # "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
-supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
+supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide,flow_or_qps_equally_divide
 
 # Default algorithm name for namespace bundle split
 defaultNamespaceBundleSplitAlgorithm=range_equally_divide
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fffa57ff4ab..30ca29faad8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2160,6 +2160,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10;
 
+
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "In FlowOrQpsEquallyDivideBundleSplitAlgorithm,"
+                    + " if msgRate >= loadBalancerNamespaceBundleMaxMsgRate * "
+                    + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0 "
+                    + " or throughput >=  loadBalancerNamespaceBundleMaxBandwidthMbytes * "
+                    + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0, "
+                    + " execute split bundle"
+    )
+    private int flowOrQpsDifferenceThresholdPercentage = 10;
+
     @FieldContext(
             dynamic = true,
             category = CATEGORY_LOAD_BALANCER,
@@ -2325,7 +2338,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
         doc = "Supported algorithms name for namespace bundle split"
     )
     private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide",
-            "topic_count_equally_divide", "specified_positions_divide");
+            "topic_count_equally_divide", "specified_positions_divide", "flow_or_qps_equally_divide");
     @FieldContext(
         dynamic = true,
         category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4b8d02da002..3467d25777e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
 import org.apache.pulsar.common.lookup.GetTopicsResult;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.BundleSplitOption;
+import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -87,6 +88,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -828,7 +830,19 @@ public class NamespaceService implements AutoCloseable {
                                        CompletableFuture<Void> completionFuture,
                                        NamespaceBundleSplitAlgorithm splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        BundleSplitOption bundleSplitOption;
+        if (config.getDefaultNamespaceBundleSplitAlgorithm()
+                  .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
+            Map<String, TopicStatsImpl> topicStatsMap =  pulsar.getBrokerService().getTopicStats(bundle);
+            bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
+                    topicStatsMap,
+                    config.getLoadBalancerNamespaceBundleMaxMsgRate(),
+                    config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
+                    config.getFlowOrQpsDifferenceThresholdPercentage());
+        } else {
+            bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        }
+
         splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
             CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
             if (ex == null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 35a5e8ad34d..7c0afdb712a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -462,6 +462,23 @@ public class BrokerService implements Closeable {
         return bootstrap;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = getMultiLayerTopicMap()
+                .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+                    return ConcurrentOpenHashMap
+                            .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+                }).computeIfAbsent(bundle.toString(), k -> {
+                    return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+                });
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        topicMap.forEach((name, topic) -> {
+            topicStatsMap.put(name,
+                    topic.getStats(false, false, false));
+        });
+        return topicStatsMap;
+    }
+
     public void start() throws Exception {
         this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
                 PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java
new file mode 100644
index 00000000000..010d725c5ea
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicHashList.add(hashCode);
+                topicInfoMap.put(hashCode, new TopicInfo(topic, msgRate, throughput));
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicInfoMap.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+            Collections.sort(topicHashList);
+
+
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = topicInfoMap.get(topicHashList.get(0)).msgRate;
+            double bundleThroughputTmp = topicInfoMap.get(topicHashList.get(0)).throughput;
+
+            for (int i = 1; i < topicHashList.size(); i++) {
+                long topicHashCode = topicHashList.get(i);
+                double msgRate = topicInfoMap.get(topicHashCode).msgRate;
+                double throughput = topicInfoMap.get(topicHashCode).throughput;
+
+                if ((bundleMsgRateTmp + msgRate) > loadBalancerNamespaceBundleMaxMsgRate
+                        || (bundleThroughputTmp + throughput) > loadBalancerNamespaceBundleMaxBandwidthBytes) {
+                    long splitStart = topicHashList.get(i - 1);
+                    long splitEnd = topicHashList.get(i);
+                    long splitMiddle = splitStart + (splitEnd - splitStart) / 2 + 1;
+                    splitResults.add(splitMiddle);
+
+                    bundleMsgRateTmp = msgRate;
+                    bundleThroughputTmp = throughput;
+                } else {
+                    bundleMsgRateTmp += msgRate;
+                    bundleThroughputTmp += throughput;
+                }
+            }
+
+            return CompletableFuture.completedFuture(splitResults);
+        });
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java
new file mode 100644
index 00000000000..eba043ef290
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+@Getter
+public class FlowOrQpsEquallyDivideBundleSplitOption extends BundleSplitOption {
+    private Map<String, TopicStatsImpl> topicStatsMap;
+    private int loadBalancerNamespaceBundleMaxMsgRate;
+    private int loadBalancerNamespaceBundleMaxBandwidthMbytes;
+    private int flowOrQpsDifferenceThresholdPercentage;
+
+    public FlowOrQpsEquallyDivideBundleSplitOption(NamespaceService namespaceService, NamespaceBundle bundle,
+                                                   List<Long> boundaries,
+                                                   Map<String, TopicStatsImpl> topicStatsMap,
+                                                   int loadBalancerNamespaceBundleMaxMsgRate,
+                                                   int loadBalancerNamespaceBundleMaxBandwidthMbytes,
+                                                   int flowOrQpsDifferenceThresholdPercentage) {
+        super(namespaceService, bundle, boundaries);
+        this.topicStatsMap = topicStatsMap;
+        this.loadBalancerNamespaceBundleMaxMsgRate = loadBalancerNamespaceBundleMaxMsgRate;
+        this.loadBalancerNamespaceBundleMaxBandwidthMbytes = loadBalancerNamespaceBundleMaxBandwidthMbytes;
+        this.flowOrQpsDifferenceThresholdPercentage = flowOrQpsDifferenceThresholdPercentage;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index a4072ddc20e..6c54483244c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -30,14 +30,16 @@ public interface NamespaceBundleSplitAlgorithm {
     String RANGE_EQUALLY_DIVIDE_NAME = "range_equally_divide";
     String TOPIC_COUNT_EQUALLY_DIVIDE = "topic_count_equally_divide";
     String SPECIFIED_POSITIONS_DIVIDE = "specified_positions_divide";
+    String FLOW_OR_QPS_EQUALLY_DIVIDE = "flow_or_qps_equally_divide";
 
     List<String> AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
-            TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_POSITIONS_DIVIDE);
+            TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_POSITIONS_DIVIDE, FLOW_OR_QPS_EQUALLY_DIVIDE);
 
     NamespaceBundleSplitAlgorithm RANGE_EQUALLY_DIVIDE_ALGO = new RangeEquallyDivideBundleSplitAlgorithm();
     NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm();
     NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
             new SpecifiedPositionsBundleSplitAlgorithm();
+    NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
 
     static NamespaceBundleSplitAlgorithm of(String algorithmName) {
         if (algorithmName == null) {
@@ -50,6 +52,8 @@ public interface NamespaceBundleSplitAlgorithm {
                 return TOPIC_COUNT_EQUALLY_DIVIDE_ALGO;
             case SPECIFIED_POSITIONS_DIVIDE:
                 return SPECIFIED_POSITIONS_DIVIDE_ALGO;
+            case FLOW_OR_QPS_EQUALLY_DIVIDE:
+                return FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO;
             default:
                 return null;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java
new file mode 100644
index 00000000000..c7c1fd10263
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import com.google.common.hash.Hashing;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertTrue;
+
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithmTest {
+
+    @Test
+    public void testSplitBundleByFlowOrQps() {
+        FlowOrQpsEquallyDivideBundleSplitAlgorithm algorithm = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
+        int loadBalancerNamespaceBundleMaxMsgRate = 1010;
+        int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
+        int flowOrQpsDifferenceThresholdPercentage = 10;
+
+        Map<Long, Double> hashAndMsgMap = new HashMap<>();
+        Map<Long, Double> hashAndThroughput = new HashMap<>();
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        List<String> mockTopics = new ArrayList<>();
+        List<Long> hashList = new ArrayList<>();
+
+        for (int i = 1; i < 6; i++) {
+            String topicName = "persistent://test-tenant1/test-namespace1/test" + i;
+            for (int j = 0; j < 20; j++) {
+                String tp = topicName + "-partition-" + j;
+                mockTopics.add(tp);
+                TopicStatsImpl topicStats = new TopicStatsImpl();
+                topicStats.msgRateIn = 24.5;
+                topicStats.msgThroughputIn = 1000;
+                topicStats.msgRateOut = 25;
+                topicStats.msgThroughputOut = 1000;
+                topicStatsMap.put(tp, topicStats);
+            }
+        }
+
+
+        for (int i = 6; i < 13; i++) {
+            String topicName = "persistent://test-tenant1/test-namespace1/test" + i;
+            for (int j = 0; j < 20; j++) {
+                String tp = topicName + "-partition-" + j;
+                mockTopics.add(tp);
+                TopicStatsImpl topicStats = new TopicStatsImpl();
+                topicStats.msgRateIn = 25.5;
+                topicStats.msgThroughputIn = 1000;
+                topicStats.msgRateOut = 25;
+                topicStats.msgThroughputOut = 1000;
+                topicStatsMap.put(tp, topicStats);
+            }
+        }
+
+        String tp = "persistent://test-tenant1/test-namespace1/test695-partition-0";
+        mockTopics.add(tp);
+        TopicStatsImpl topicStats = new TopicStatsImpl();
+        topicStats.msgRateIn = 25;
+        topicStats.msgThroughputIn = 1000;
+        topicStats.msgRateOut = 35;
+        topicStats.msgThroughputOut = 1000;
+        topicStatsMap.put(tp, topicStats);
+
+        // -- do test
+        NamespaceService mockNamespaceService = mock(NamespaceService.class);
+        NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+        doReturn(CompletableFuture.completedFuture(mockTopics))
+                .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+        NamespaceBundleFactory mockNamespaceBundleFactory = mock(NamespaceBundleFactory.class);
+        doReturn(mockNamespaceBundleFactory)
+                .when(mockNamespaceBundle).getNamespaceBundleFactory();
+        mockTopics.forEach((topic) -> {
+            long hashValue = Hashing.crc32().hashString(topic, UTF_8).padToLong();
+            doReturn(hashValue)
+                    .when(mockNamespaceBundleFactory).getLongHashCode(topic);
+            hashList.add(hashValue);
+            hashAndMsgMap.put(hashValue, topicStatsMap.get(topic).msgRateIn
+                    + topicStatsMap.get(topic).msgRateOut);
+            hashAndThroughput.put(hashValue, topicStatsMap.get(topic).msgThroughputIn
+                    + topicStatsMap.get(topic).msgThroughputOut);
+        });
+
+        List<Long> splitPositions = algorithm.getSplitBoundary(new FlowOrQpsEquallyDivideBundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+                null, topicStatsMap, loadBalancerNamespaceBundleMaxMsgRate,
+                loadBalancerNamespaceBundleMaxBandwidthMbytes, flowOrQpsDifferenceThresholdPercentage)).join();
+
+        Collections.sort(hashList);
+        int i = 0;
+        for (Long position : splitPositions) {
+            Long endPosition = position;
+            double bundleMsgRateTmp = 0;
+            double bundleThroughputTmp = 0;
+            while (hashList.get(i) < endPosition) {
+                bundleMsgRateTmp += hashAndMsgMap.get(hashList.get(i));
+                bundleThroughputTmp += hashAndThroughput.get(hashList.get(i));
+                i++;
+            }
+            assertTrue(bundleMsgRateTmp < loadBalancerNamespaceBundleMaxMsgRate);
+            assertTrue(bundleThroughputTmp < loadBalancerNamespaceBundleMaxBandwidthMbytes * 1024 * 1024);
+        }
+    }
+
+
+    @Test
+    public void testFirstPositionIsOverLoad() {
+        FlowOrQpsEquallyDivideBundleSplitAlgorithm algorithm = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
+        int loadBalancerNamespaceBundleMaxMsgRate = 1010;
+        int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
+        int flowOrQpsDifferenceThresholdPercentage = 10;
+        int topicNum = 5;
+
+        List<String> mockTopics = new ArrayList<>();
+        List<Long> topicHashList = new ArrayList<>(topicNum);
+        Map<Long, String> hashAndTopic = new HashMap<>();
+
+        for (int i = 0; i < topicNum; i++) {
+            String topicName = "persistent://test-tenant1/test-namespace1/test-partition-" + i;
+            mockTopics.add(topicName);
+            long hashValue = Hashing.crc32().hashString(topicName, UTF_8).padToLong();
+            topicHashList.add(hashValue);
+            hashAndTopic.put(hashValue, topicName);
+        }
+        Collections.sort(topicHashList);
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+
+        long hashValue = topicHashList.get(0);
+        String topicName = hashAndTopic.get(hashValue);
+        TopicStatsImpl topicStats0 = new TopicStatsImpl();
+        topicStats0.msgRateIn = 1000;
+        topicStats0.msgThroughputIn = 1000;
+        topicStats0.msgRateOut = 1000;
+        topicStats0.msgThroughputOut = 1000;
+        topicStatsMap.put(topicName, topicStats0);
+
+        for (int i = 1; i < topicHashList.size(); i++) {
+            hashValue = topicHashList.get(i);
+            topicName = hashAndTopic.get(hashValue);
+            TopicStatsImpl topicStats = new TopicStatsImpl();
+            topicStats.msgRateIn = 24.5;
+            topicStats.msgThroughputIn = 1000;
+            topicStats.msgRateOut = 25;
+            topicStats.msgThroughputOut = 1000;
+            topicStatsMap.put(topicName, topicStats);
+        }
+
+        // -- do test
+        NamespaceService mockNamespaceService = mock(NamespaceService.class);
+        NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+        doReturn(CompletableFuture.completedFuture(mockTopics))
+                .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+        NamespaceBundleFactory mockNamespaceBundleFactory = mock(NamespaceBundleFactory.class);
+        doReturn(mockNamespaceBundleFactory)
+                .when(mockNamespaceBundle).getNamespaceBundleFactory();
+        mockTopics.forEach((topic) -> {
+            long hash = Hashing.crc32().hashString(topic, UTF_8).padToLong();
+            doReturn(hash)
+                    .when(mockNamespaceBundleFactory).getLongHashCode(topic);
+        });
+        List<Long> splitPositions = algorithm.getSplitBoundary(new FlowOrQpsEquallyDivideBundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+                null, topicStatsMap, loadBalancerNamespaceBundleMaxMsgRate,
+                loadBalancerNamespaceBundleMaxBandwidthMbytes, flowOrQpsDifferenceThresholdPercentage)).join();
+
+        long splitStart = topicHashList.get(0);
+        long splitEnd = topicHashList.get(1);
+        long splitMiddle = splitStart + (splitEnd - splitStart) / 2 + 1;
+        assertTrue(splitPositions.get(0) == splitMiddle);
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
index a88f8081aef..75f147e2829 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
@@ -19,12 +19,11 @@
 package org.apache.pulsar.common.naming;
 
 
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME;
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE;
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.*;
+
 
 public class NamespaceBundleSplitAlgorithmTest {
 
@@ -40,5 +39,7 @@ public class NamespaceBundleSplitAlgorithmTest {
         Assert.assertTrue(topicCountEquallyDivide instanceof TopicCountEquallyDivideBundleSplitAlgorithm);
         NamespaceBundleSplitAlgorithm specifiedTopicCountEquallyDivide = NamespaceBundleSplitAlgorithm.of(SPECIFIED_POSITIONS_DIVIDE);
         Assert.assertTrue(specifiedTopicCountEquallyDivide instanceof SpecifiedPositionsBundleSplitAlgorithm);
+        NamespaceBundleSplitAlgorithm flowOrQPSEquallyDivide = NamespaceBundleSplitAlgorithm.of(FLOW_OR_QPS_EQUALLY_DIVIDE);
+        Assert.assertTrue(flowOrQPSEquallyDivide instanceof FlowOrQpsEquallyDivideBundleSplitAlgorithm);
     }
 }
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index c89bb152fc1..323b47a2359 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -906,7 +906,8 @@ public class CmdNamespaces extends CmdBase {
 
         @Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split "
                 + "namespace bundle. Valid options are: [range_equally_divide, topic_count_equally_divide, "
-                + "specified_positions_divide]. Use broker side config if absent", required = false)
+                + "specified_positions_divide, flow_or_qps_equally_divide]. Use broker side config if absent"
+                , required = false)
         private String splitAlgorithmName;
 
         @Parameter(names = { "--split-boundaries",