You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/13 02:13:36 UTC

[GitHub] [pulsar] lordcheng10 opened a new pull request, #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

lordcheng10 opened a new pull request, #16557:
URL: https://github.com/apache/pulsar/pull/16557

   
   ### Motivation
   Split algorithm based on flow or qps。
   
   The current bundle algorithm is not flexible enough to split according to specific traffic, which may lead to a large traffic volume for a certain writing bundle.
   The new MessageRateAndThroughputEquallyDivideBundleSplitAlgorithm algorithm can automatically find a location with moderate traffic to split, reducing some invalid splits
   
   
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16557:
URL: https://github.com/apache/pulsar/pull/16557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1228301450

   @Technoboy- @codelipenghui @HQebupt PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1182739136

   @lordcheng10 For the new features, APIs. We should start with a proposal first. Please follow the proposal guide https://github.com/apache/pulsar/wiki/Pulsar-Improvement-Proposal-%28PIP%29


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962942860


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   I tried to calculate EntryRate with org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl#chunkedMessageRate and org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl#chunkedMessageRate.
   
   @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r960468128


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +824,34 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+            return ConcurrentOpenHashMap
+                    .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+        }).computeIfAbsent(bundle.toString(), k -> {
+            return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+        });
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        topicMap.forEach((name, topic) -> {
+            topicStatsMap.put(name,
+                    topic.getStats(false, false, false));
+        });
+        return topicStatsMap;
+    }
+
     void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> completionFuture,
                                        NamespaceBundleSplitAlgorithm splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        Map<String, TopicStatsImpl> topicStatsMap = getTopicStats(bundle);

Review Comment:
   fixed.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +824,34 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+            return ConcurrentOpenHashMap
+                    .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+        }).computeIfAbsent(bundle.toString(), k -> {
+            return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+        });
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        topicMap.forEach((name, topic) -> {
+            topicStatsMap.put(name,
+                    topic.getStats(false, false, false));
+        });
+        return topicStatsMap;
+    }
+
     void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> completionFuture,
                                        NamespaceBundleSplitAlgorithm splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        Map<String, TopicStatsImpl> topicStatsMap = getTopicStats(bundle);
+        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries, topicStatsMap,

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1229681406

   The vote has been passed(vote thread: https://lists.apache.org/thread/g6stjsrymmq3850pdt9whzgx91tx33v8), PTAL,thanks! @codelipenghui @Technoboy- @Technoboy- @eolivelli 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1231121336

   ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1235160784

   @eolivelli PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962840017


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   I'm not sure, is there currently a statistic entry rate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223501977

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1182990492

   > @lordcheng10 For the new features, APIs. We should start with a proposal first. Please follow the proposal guide https://github.com/apache/pulsar/wiki/Pulsar-Improvement-Proposal-%28PIP%29
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1187561157

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1236763124

   @aloyszhang @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1194150553

   > @lordcheng10 For the new features, APIs. We should start with a proposal first. Please follow the proposal guide https://github.com/apache/pulsar/wiki/Pulsar-Improvement-Proposal-%28PIP%29
   
   PIP:   https://github.com/apache/pulsar/issues/16782
   discuss thread:  https://lists.apache.org/thread/cshyt10fwcjjxs93g8yf0svgwcgnshmg
   
   @codelipenghui 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1183922536

   
   /pulsarbot run-failure-checks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r960493224


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicHashList.add(hashCode);
+                topicInfoMap.put(hashCode, new TopicInfo(topic, msgRate, throughput));
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicInfoMap.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+            Collections.sort(topicHashList);
+
+
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = topicInfoMap.get(topicHashList.get(0)).msgRate;
+            double bundleThroughputTmp = topicInfoMap.get(topicHashList.get(0)).throughput;
+
+            for (int i = 1; i < topicHashList.size(); i++) {
+                long topicHashCode = topicHashList.get(i);
+                double msgRate = topicInfoMap.get(topicHashCode).msgRate;
+                double throughput = topicInfoMap.get(topicHashCode).throughput;
+
+                if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate
+                        || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes
+                        || (bundleMsgRateTmp + msgRate) > loadBalancerNamespaceBundleMaxMsgRate

Review Comment:
   `(bundleMsgRateTmp + msgRate) > loadBalancerNamespaceBundleMaxMsgRate` includes `bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate`, so the latter one can be removed.
   
   So is bundleThroughputTmp.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1235106288

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r960482131


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +825,42 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {

Review Comment:
   we should modify the multiLayerTopicMap returned by the BrokerService.
   this part that accesses that data structure should be inside BrokerService, otherwise we are breaking incapsulation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1236057426

   @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962958048


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   Ok, I see. It could be a part to improve in a separate PR or PIP. Not a blocker for this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962828995


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   https://lists.apache.org/thread/df5v1q1xw1lo1y6n2wf3hp5zjoky0vmz The related discussion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962823480


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   I think this one is based on the message, not the entry right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223417008

   @Technoboy- @codelipenghui @eolivelli PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r960524551


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicHashList.add(hashCode);
+                topicInfoMap.put(hashCode, new TopicInfo(topic, msgRate, throughput));
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicInfoMap.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+            Collections.sort(topicHashList);
+
+
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = topicInfoMap.get(topicHashList.get(0)).msgRate;
+            double bundleThroughputTmp = topicInfoMap.get(topicHashList.get(0)).throughput;
+
+            for (int i = 1; i < topicHashList.size(); i++) {
+                long topicHashCode = topicHashList.get(i);
+                double msgRate = topicInfoMap.get(topicHashCode).msgRate;
+                double throughput = topicInfoMap.get(topicHashCode).throughput;
+
+                if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate
+                        || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes
+                        || (bundleMsgRateTmp + msgRate) > loadBalancerNamespaceBundleMaxMsgRate

Review Comment:
   Fixed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +825,42 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {

Review Comment:
   Fixed.PTAL,thanks! @eolivelli 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1187097010

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1234410886

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962953077


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   In org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl#checkNamespaceBundleSplit, when selecting the bundle to be split, there is also a MessageRate condition, which is also the message rate used, not the entry rate.
   If we also want to modify it to entry rate, we need to add an entry rate field to the load information reported by the broker, and the change will be relatively large.
   
   <img width="1394" alt="image" src="https://user-images.githubusercontent.com/19296967/188468870-011b2b08-dd8d-4313-b784-ca54512381c1.png">
   
   
   @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962962215


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   OK,I will submit a new PIP to replace message rate with entry rate!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1196409637

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1205295734

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1205015475

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r960468295


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        long loadBalancerNamespaceBundleMaxBandwidthBytes =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, Double> hashAndMsgMap = new HashMap<>();
+            Map<Long, Double> hashAndThroughput = new HashMap<>();
+            Map<Long, String> hashAndTopic = new HashMap<>();
+            List<Long> topicNameHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicNameHashList.add(hashCode);
+                hashAndMsgMap.put(hashCode, msgRate);
+                hashAndThroughput.put(hashCode, throughput);
+                hashAndTopic.put(hashCode, topic);
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicNameHashList.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            Collections.sort(topicNameHashList);
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = 0;
+            double bundleThroughputTmp = 0;
+            for (int i = 0; i < topicNameHashList.size(); i++) {
+                long topicHashCode = topicNameHashList.get(i);
+                bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
+                bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
+
+                if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate
+                        || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes) {
+                    long splitStart = i > 0 ? topicNameHashList.get(i - 1) : topicHashCode;
+                    long splitEnd = i > 0 ? topicHashCode : topicNameHashList.get(i + 1);
+                    long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
+                    splitResults.add(splitMiddle);
+                    bundleMsgRateTmp =  hashAndMsgMap.get(topicHashCode);

Review Comment:
   fixed



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        long loadBalancerNamespaceBundleMaxBandwidthBytes =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, Double> hashAndMsgMap = new HashMap<>();
+            Map<Long, Double> hashAndThroughput = new HashMap<>();
+            Map<Long, String> hashAndTopic = new HashMap<>();
+            List<Long> topicNameHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicNameHashList.add(hashCode);
+                hashAndMsgMap.put(hashCode, msgRate);
+                hashAndThroughput.put(hashCode, throughput);
+                hashAndTopic.put(hashCode, topic);
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicNameHashList.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            Collections.sort(topicNameHashList);
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = 0;
+            double bundleThroughputTmp = 0;
+            for (int i = 0; i < topicNameHashList.size(); i++) {
+                long topicHashCode = topicNameHashList.get(i);
+                bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
+                bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
+
+                if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate
+                        || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes) {
+                    long splitStart = i > 0 ? topicNameHashList.get(i - 1) : topicHashCode;
+                    long splitEnd = i > 0 ? topicHashCode : topicNameHashList.get(i + 1);
+                    long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
+                    splitResults.add(splitMiddle);
+                    bundleMsgRateTmp =  hashAndMsgMap.get(topicHashCode);

Review Comment:
   @Jason918 PTAL,thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223425835

   https://github.com/apache/pulsar/pull/16557


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223416457

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223501749

   discuss thread https://lists.apache.org/thread/cshyt10fwcjjxs93g8yf0svgwcgnshmg


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1236459295

   @eolivelli PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962832622


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   > I think this one is based on the message, not the entry right?
   
   Yes, this is based on message 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962839613


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   > I think this one is based on the message, not the entry right?
   
   Yes, this is based on the message, not the entry right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1234062368

   @codelipenghui @eolivelli @hangc0276 @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r962953077


##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    class TopicInfo {
+        String topicName;
+        double msgRate;
+        double throughput;
+
+        public TopicInfo(String topicName, double msgRate, double throughput) {
+            this.topicName = topicName;
+            this.msgRate = msgRate;
+            this.throughput = throughput;
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+        FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+                (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+        long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+                .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+            List<Long> topicHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();

Review Comment:
   In org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl#checkNamespaceBundleSplit, when selecting the bundle to be split, there is also a MessageRate condition, which is also the message rate used, not the entry rate:
   
   <img width="1394" alt="image" src="https://user-images.githubusercontent.com/19296967/188468870-011b2b08-dd8d-4313-b784-ca54512381c1.png">
   
   @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [improve][broker]Add new split algorithm: Split algorithm based on flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1183926343

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223586836

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1223416539

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1228716955

   @eolivelli PTAL,Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#discussion_r959106537


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +824,34 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+            return ConcurrentOpenHashMap
+                    .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+        }).computeIfAbsent(bundle.toString(), k -> {
+            return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+        });
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        topicMap.forEach((name, topic) -> {
+            topicStatsMap.put(name,
+                    topic.getStats(false, false, false));
+        });
+        return topicStatsMap;
+    }
+
     void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> completionFuture,
                                        NamespaceBundleSplitAlgorithm splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        Map<String, TopicStatsImpl> topicStatsMap = getTopicStats(bundle);

Review Comment:
   This should not be called if `flow_or_qps_equally_divide` is not enabled.



##########
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java:
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+    private static final long MBytes = 1024 * 1024;
+
+    @Override
+    public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
+        NamespaceService service = bundleSplitOption.getService();
+        NamespaceBundle bundle = bundleSplitOption.getBundle();
+        Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+        int loadBalancerNamespaceBundleMaxMsgRate =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+        long loadBalancerNamespaceBundleMaxBandwidthBytes =
+                bundleSplitOption.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+        double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+
+        return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+            if (topics == null || topics.size() <= 1) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            double bundleThroughput = 0;
+            double bundleMsgRate = 0;
+            Map<Long, Double> hashAndMsgMap = new HashMap<>();
+            Map<Long, Double> hashAndThroughput = new HashMap<>();
+            Map<Long, String> hashAndTopic = new HashMap<>();
+            List<Long> topicNameHashList = new ArrayList<>(topics.size());
+            for (String topic : topics) {
+                TopicStatsImpl topicStats = topicStatsMap.get(topic);
+                if (topicStats == null) {
+                    continue;
+                }
+                double msgRateIn = topicStats.getMsgRateIn();
+                double msgRateOut = topicStats.getMsgRateOut();
+                double msgThroughputIn = topicStats.getMsgThroughputIn();
+                double msgThroughputOut = topicStats.getMsgThroughputOut();
+                double msgRate = msgRateIn + msgRateOut;
+                double throughput = msgThroughputIn + msgThroughputOut;
+
+                if (msgRate <= 0 && throughput <= 0) {
+                    // Skip empty topic
+                    continue;
+                }
+
+                Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+                topicNameHashList.add(hashCode);
+                hashAndMsgMap.put(hashCode, msgRate);
+                hashAndThroughput.put(hashCode, throughput);
+                hashAndTopic.put(hashCode, topic);
+                bundleThroughput += throughput;
+                bundleMsgRate += msgRate;
+            }
+
+            if (topicNameHashList.size() < 2
+                    || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+                    && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            Collections.sort(topicNameHashList);
+            List<Long> splitResults = new ArrayList<>();
+            double bundleMsgRateTmp = 0;
+            double bundleThroughputTmp = 0;
+            for (int i = 0; i < topicNameHashList.size(); i++) {
+                long topicHashCode = topicNameHashList.get(i);
+                bundleThroughputTmp += hashAndThroughput.get(topicHashCode);
+                bundleMsgRateTmp += hashAndMsgMap.get(topicHashCode);
+
+                if (bundleMsgRateTmp > loadBalancerNamespaceBundleMaxMsgRate
+                        || bundleThroughputTmp > loadBalancerNamespaceBundleMaxBandwidthBytes) {
+                    long splitStart = i > 0 ? topicNameHashList.get(i - 1) : topicHashCode;
+                    long splitEnd = i > 0 ? topicHashCode : topicNameHashList.get(i + 1);
+                    long splitMiddle = splitStart + (splitEnd - splitStart) / 2;
+                    splitResults.add(splitMiddle);
+                    bundleMsgRateTmp =  hashAndMsgMap.get(topicHashCode);

Review Comment:
   Seems not right if i == 0 here ? Please help point out the unit test covering this case.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java:
##########
@@ -822,13 +824,34 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean
         return unloadFuture;
     }
 
+    public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+        ConcurrentOpenHashMap<String, Topic> topicMap = pulsar.getBrokerService().getMultiLayerTopicMap()
+        .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+            return ConcurrentOpenHashMap
+                    .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+        }).computeIfAbsent(bundle.toString(), k -> {
+            return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+        });
+
+        Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+        topicMap.forEach((name, topic) -> {
+            topicStatsMap.put(name,
+                    topic.getStats(false, false, false));
+        });
+        return topicStatsMap;
+    }
+
     void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
                                        boolean unload,
                                        AtomicInteger counter,
                                        CompletableFuture<Void> completionFuture,
                                        NamespaceBundleSplitAlgorithm splitAlgorithm,
                                        List<Long> boundaries) {
-        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+        Map<String, TopicStatsImpl> topicStatsMap = getTopicStats(bundle);
+        BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries, topicStatsMap,

Review Comment:
   It seems better if we create different `BundleSplitOption` class for each split algorithm if the parameters are that different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1231553022

   @Jason918 PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #16557: [PIP-169] Support split bundle by flow or qps

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #16557:
URL: https://github.com/apache/pulsar/pull/16557#issuecomment-1236731024

   @eolivelli PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org