You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/05 14:25:06 UTC
[pulsar] branch master updated: [PIP-169] Support split bundle by flow or qps (#16557)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8eb16cf02c5 [PIP-169] Support split bundle by flow or qps (#16557)
8eb16cf02c5 is described below
commit 8eb16cf02c5b396554ba812de15db8911907695a
Author: LinChen <15...@qq.com>
AuthorDate: Mon Sep 5 22:24:56 2022 +0800
[PIP-169] Support split bundle by flow or qps (#16557)
Fixes https://github.com/apache/pulsar/issues/16782
### Motivation
As we all know, Bundle split has 3 algorithms:
- range_equally_divide
- topic_count_equally_divide
- specified_positions_divide
However, none of these algorithms can divide bundles according to flow or qps, which may cause bundles to be split multiple times.
---
conf/broker.conf | 2 +-
deployment/terraform-ansible/templates/broker.conf | 2 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 15 +-
.../pulsar/broker/namespace/NamespaceService.java | 16 +-
.../pulsar/broker/service/BrokerService.java | 17 ++
...FlowOrQpsEquallyDivideBundleSplitAlgorithm.java | 129 ++++++++++++++
.../FlowOrQpsEquallyDivideBundleSplitOption.java | 46 +++++
.../naming/NamespaceBundleSplitAlgorithm.java | 6 +-
...OrQpsEquallyDivideBundleSplitAlgorithmTest.java | 194 +++++++++++++++++++++
.../naming/NamespaceBundleSplitAlgorithmTest.java | 7 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 3 +-
11 files changed, 428 insertions(+), 9 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 6f51c96a008..67655674203 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1225,7 +1225,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
-supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
+supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide,flow_or_qps_equally_divide
# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf
index 5075b600b25..eb81a284e73 100644
--- a/deployment/terraform-ansible/templates/broker.conf
+++ b/deployment/terraform-ansible/templates/broker.conf
@@ -931,7 +931,7 @@ loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManage
# Supported algorithms name for namespace bundle split.
# "range_equally_divide" divides the bundle into two parts with the same hash range size.
# "topic_count_equally_divide" divides the bundle into two parts with the same topics count.
-supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
+supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide,flow_or_qps_equally_divide
# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fffa57ff4ab..30ca29faad8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2160,6 +2160,19 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10;
+
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "In FlowOrQpsEquallyDivideBundleSplitAlgorithm,"
+ + " if msgRate >= loadBalancerNamespaceBundleMaxMsgRate * "
+ + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0 "
+ + " or throughput >= loadBalancerNamespaceBundleMaxBandwidthMbytes * "
+ + " (100 + flowOrQpsDifferenceThresholdPercentage)/100.0, "
+ + " execute split bundle"
+ )
+ private int flowOrQpsDifferenceThresholdPercentage = 10;
+
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
@@ -2325,7 +2338,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Supported algorithms name for namespace bundle split"
)
private List<String> supportedNamespaceBundleSplitAlgorithms = Lists.newArrayList("range_equally_divide",
- "topic_count_equally_divide", "specified_positions_divide");
+ "topic_count_equally_divide", "specified_positions_divide", "flow_or_qps_equally_divide");
@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 4b8d02da002..3467d25777e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -74,6 +74,7 @@ import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
+import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -87,6 +88,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -828,7 +830,19 @@ public class NamespaceService implements AutoCloseable {
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
- BundleSplitOption bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+ BundleSplitOption bundleSplitOption;
+ if (config.getDefaultNamespaceBundleSplitAlgorithm()
+ .equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
+ Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
+ bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
+ topicStatsMap,
+ config.getLoadBalancerNamespaceBundleMaxMsgRate(),
+ config.getLoadBalancerNamespaceBundleMaxBandwidthMbytes(),
+ config.getFlowOrQpsDifferenceThresholdPercentage());
+ } else {
+ bundleSplitOption = new BundleSplitOption(this, bundle, boundaries);
+ }
+
splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
if (ex == null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 35a5e8ad34d..7c0afdb712a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -462,6 +462,23 @@ public class BrokerService implements Closeable {
return bootstrap;
}
+ public Map<String, TopicStatsImpl> getTopicStats(NamespaceBundle bundle) {
+ ConcurrentOpenHashMap<String, Topic> topicMap = getMultiLayerTopicMap()
+ .computeIfAbsent(bundle.getNamespaceObject().toString(), k -> {
+ return ConcurrentOpenHashMap
+ .<String, ConcurrentOpenHashMap<String, Topic>>newBuilder().build();
+ }).computeIfAbsent(bundle.toString(), k -> {
+ return ConcurrentOpenHashMap.<String, Topic>newBuilder().build();
+ });
+
+ Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+ topicMap.forEach((name, topic) -> {
+ topicStatsMap.put(name,
+ topic.getStats(false, false, false));
+ });
+ return topicStatsMap;
+ }
+
public void start() throws Exception {
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java
new file mode 100644
index 00000000000..010d725c5ea
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithm.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+
+/**
+ * Split algorithm based on flow or qps.
+ */
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm {
+ private static final long MBytes = 1024 * 1024;
+
+ class TopicInfo {
+ String topicName;
+ double msgRate;
+ double throughput;
+
+ public TopicInfo(String topicName, double msgRate, double throughput) {
+ this.topicName = topicName;
+ this.msgRate = msgRate;
+ this.throughput = throughput;
+ }
+ }
+
+ @Override
+ public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOptionTmp) {
+ FlowOrQpsEquallyDivideBundleSplitOption bundleSplitOption =
+ (FlowOrQpsEquallyDivideBundleSplitOption) bundleSplitOptionTmp;
+ NamespaceService service = bundleSplitOption.getService();
+ NamespaceBundle bundle = bundleSplitOption.getBundle();
+ Map<String, TopicStatsImpl> topicStatsMap = bundleSplitOption.getTopicStatsMap();
+ int loadBalancerNamespaceBundleMaxMsgRate = bundleSplitOption.getLoadBalancerNamespaceBundleMaxMsgRate();
+ double diffThreshold = bundleSplitOption.getFlowOrQpsDifferenceThresholdPercentage() / 100.0;
+ long loadBalancerNamespaceBundleMaxBandwidthBytes = bundleSplitOption
+ .getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * MBytes;
+
+
+ return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
+ if (topics == null || topics.size() <= 1) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ double bundleThroughput = 0;
+ double bundleMsgRate = 0;
+ Map<Long, TopicInfo> topicInfoMap = new HashMap<>();
+ List<Long> topicHashList = new ArrayList<>(topics.size());
+ for (String topic : topics) {
+ TopicStatsImpl topicStats = topicStatsMap.get(topic);
+ if (topicStats == null) {
+ continue;
+ }
+ double msgRateIn = topicStats.getMsgRateIn();
+ double msgRateOut = topicStats.getMsgRateOut();
+ double msgThroughputIn = topicStats.getMsgThroughputIn();
+ double msgThroughputOut = topicStats.getMsgThroughputOut();
+ double msgRate = msgRateIn + msgRateOut;
+ double throughput = msgThroughputIn + msgThroughputOut;
+ if (msgRate <= 0 && throughput <= 0) {
+ // Skip empty topic
+ continue;
+ }
+
+ Long hashCode = bundle.getNamespaceBundleFactory().getLongHashCode(topic);
+ topicHashList.add(hashCode);
+ topicInfoMap.put(hashCode, new TopicInfo(topic, msgRate, throughput));
+ bundleThroughput += throughput;
+ bundleMsgRate += msgRate;
+ }
+
+ if (topicInfoMap.size() < 2
+ || (bundleMsgRate < (loadBalancerNamespaceBundleMaxMsgRate * (1 + diffThreshold))
+ && bundleThroughput < (loadBalancerNamespaceBundleMaxBandwidthBytes * (1 + diffThreshold)))) {
+ return CompletableFuture.completedFuture(null);
+ }
+ Collections.sort(topicHashList);
+
+
+ List<Long> splitResults = new ArrayList<>();
+ double bundleMsgRateTmp = topicInfoMap.get(topicHashList.get(0)).msgRate;
+ double bundleThroughputTmp = topicInfoMap.get(topicHashList.get(0)).throughput;
+
+ for (int i = 1; i < topicHashList.size(); i++) {
+ long topicHashCode = topicHashList.get(i);
+ double msgRate = topicInfoMap.get(topicHashCode).msgRate;
+ double throughput = topicInfoMap.get(topicHashCode).throughput;
+
+ if ((bundleMsgRateTmp + msgRate) > loadBalancerNamespaceBundleMaxMsgRate
+ || (bundleThroughputTmp + throughput) > loadBalancerNamespaceBundleMaxBandwidthBytes) {
+ long splitStart = topicHashList.get(i - 1);
+ long splitEnd = topicHashList.get(i);
+ long splitMiddle = splitStart + (splitEnd - splitStart) / 2 + 1;
+ splitResults.add(splitMiddle);
+
+ bundleMsgRateTmp = msgRate;
+ bundleThroughputTmp = throughput;
+ } else {
+ bundleMsgRateTmp += msgRate;
+ bundleThroughputTmp += throughput;
+ }
+ }
+
+ return CompletableFuture.completedFuture(splitResults);
+ });
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java
new file mode 100644
index 00000000000..eba043ef290
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitOption.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import java.util.List;
+import java.util.Map;
+import lombok.Getter;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+
+@Getter
+public class FlowOrQpsEquallyDivideBundleSplitOption extends BundleSplitOption {
+ private Map<String, TopicStatsImpl> topicStatsMap;
+ private int loadBalancerNamespaceBundleMaxMsgRate;
+ private int loadBalancerNamespaceBundleMaxBandwidthMbytes;
+ private int flowOrQpsDifferenceThresholdPercentage;
+
+ public FlowOrQpsEquallyDivideBundleSplitOption(NamespaceService namespaceService, NamespaceBundle bundle,
+ List<Long> boundaries,
+ Map<String, TopicStatsImpl> topicStatsMap,
+ int loadBalancerNamespaceBundleMaxMsgRate,
+ int loadBalancerNamespaceBundleMaxBandwidthMbytes,
+ int flowOrQpsDifferenceThresholdPercentage) {
+ super(namespaceService, bundle, boundaries);
+ this.topicStatsMap = topicStatsMap;
+ this.loadBalancerNamespaceBundleMaxMsgRate = loadBalancerNamespaceBundleMaxMsgRate;
+ this.loadBalancerNamespaceBundleMaxBandwidthMbytes = loadBalancerNamespaceBundleMaxBandwidthMbytes;
+ this.flowOrQpsDifferenceThresholdPercentage = flowOrQpsDifferenceThresholdPercentage;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
index a4072ddc20e..6c54483244c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithm.java
@@ -30,14 +30,16 @@ public interface NamespaceBundleSplitAlgorithm {
String RANGE_EQUALLY_DIVIDE_NAME = "range_equally_divide";
String TOPIC_COUNT_EQUALLY_DIVIDE = "topic_count_equally_divide";
String SPECIFIED_POSITIONS_DIVIDE = "specified_positions_divide";
+ String FLOW_OR_QPS_EQUALLY_DIVIDE = "flow_or_qps_equally_divide";
List<String> AVAILABLE_ALGORITHMS = Lists.newArrayList(RANGE_EQUALLY_DIVIDE_NAME,
- TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_POSITIONS_DIVIDE);
+ TOPIC_COUNT_EQUALLY_DIVIDE, SPECIFIED_POSITIONS_DIVIDE, FLOW_OR_QPS_EQUALLY_DIVIDE);
NamespaceBundleSplitAlgorithm RANGE_EQUALLY_DIVIDE_ALGO = new RangeEquallyDivideBundleSplitAlgorithm();
NamespaceBundleSplitAlgorithm TOPIC_COUNT_EQUALLY_DIVIDE_ALGO = new TopicCountEquallyDivideBundleSplitAlgorithm();
NamespaceBundleSplitAlgorithm SPECIFIED_POSITIONS_DIVIDE_ALGO =
new SpecifiedPositionsBundleSplitAlgorithm();
+ NamespaceBundleSplitAlgorithm FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
static NamespaceBundleSplitAlgorithm of(String algorithmName) {
if (algorithmName == null) {
@@ -50,6 +52,8 @@ public interface NamespaceBundleSplitAlgorithm {
return TOPIC_COUNT_EQUALLY_DIVIDE_ALGO;
case SPECIFIED_POSITIONS_DIVIDE:
return SPECIFIED_POSITIONS_DIVIDE_ALGO;
+ case FLOW_OR_QPS_EQUALLY_DIVIDE:
+ return FLOW_OR_QPS_EQUALLY_DIVIDE_ALGO;
default:
return null;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java
new file mode 100644
index 00000000000..c7c1fd10263
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/FlowOrQpsEquallyDivideBundleSplitAlgorithmTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.naming;
+
+import com.google.common.hash.Hashing;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.testng.annotations.Test;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doReturn;
+import static org.testng.Assert.assertTrue;
+
+public class FlowOrQpsEquallyDivideBundleSplitAlgorithmTest {
+
+ @Test
+ public void testSplitBundleByFlowOrQps() {
+ FlowOrQpsEquallyDivideBundleSplitAlgorithm algorithm = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
+ int loadBalancerNamespaceBundleMaxMsgRate = 1010;
+ int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
+ int flowOrQpsDifferenceThresholdPercentage = 10;
+
+ Map<Long, Double> hashAndMsgMap = new HashMap<>();
+ Map<Long, Double> hashAndThroughput = new HashMap<>();
+ Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+ List<String> mockTopics = new ArrayList<>();
+ List<Long> hashList = new ArrayList<>();
+
+ for (int i = 1; i < 6; i++) {
+ String topicName = "persistent://test-tenant1/test-namespace1/test" + i;
+ for (int j = 0; j < 20; j++) {
+ String tp = topicName + "-partition-" + j;
+ mockTopics.add(tp);
+ TopicStatsImpl topicStats = new TopicStatsImpl();
+ topicStats.msgRateIn = 24.5;
+ topicStats.msgThroughputIn = 1000;
+ topicStats.msgRateOut = 25;
+ topicStats.msgThroughputOut = 1000;
+ topicStatsMap.put(tp, topicStats);
+ }
+ }
+
+
+ for (int i = 6; i < 13; i++) {
+ String topicName = "persistent://test-tenant1/test-namespace1/test" + i;
+ for (int j = 0; j < 20; j++) {
+ String tp = topicName + "-partition-" + j;
+ mockTopics.add(tp);
+ TopicStatsImpl topicStats = new TopicStatsImpl();
+ topicStats.msgRateIn = 25.5;
+ topicStats.msgThroughputIn = 1000;
+ topicStats.msgRateOut = 25;
+ topicStats.msgThroughputOut = 1000;
+ topicStatsMap.put(tp, topicStats);
+ }
+ }
+
+ String tp = "persistent://test-tenant1/test-namespace1/test695-partition-0";
+ mockTopics.add(tp);
+ TopicStatsImpl topicStats = new TopicStatsImpl();
+ topicStats.msgRateIn = 25;
+ topicStats.msgThroughputIn = 1000;
+ topicStats.msgRateOut = 35;
+ topicStats.msgThroughputOut = 1000;
+ topicStatsMap.put(tp, topicStats);
+
+ // -- do test
+ NamespaceService mockNamespaceService = mock(NamespaceService.class);
+ NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+ doReturn(CompletableFuture.completedFuture(mockTopics))
+ .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+ NamespaceBundleFactory mockNamespaceBundleFactory = mock(NamespaceBundleFactory.class);
+ doReturn(mockNamespaceBundleFactory)
+ .when(mockNamespaceBundle).getNamespaceBundleFactory();
+ mockTopics.forEach((topic) -> {
+ long hashValue = Hashing.crc32().hashString(topic, UTF_8).padToLong();
+ doReturn(hashValue)
+ .when(mockNamespaceBundleFactory).getLongHashCode(topic);
+ hashList.add(hashValue);
+ hashAndMsgMap.put(hashValue, topicStatsMap.get(topic).msgRateIn
+ + topicStatsMap.get(topic).msgRateOut);
+ hashAndThroughput.put(hashValue, topicStatsMap.get(topic).msgThroughputIn
+ + topicStatsMap.get(topic).msgThroughputOut);
+ });
+
+ List<Long> splitPositions = algorithm.getSplitBoundary(new FlowOrQpsEquallyDivideBundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+ null, topicStatsMap, loadBalancerNamespaceBundleMaxMsgRate,
+ loadBalancerNamespaceBundleMaxBandwidthMbytes, flowOrQpsDifferenceThresholdPercentage)).join();
+
+ Collections.sort(hashList);
+ int i = 0;
+ for (Long position : splitPositions) {
+ Long endPosition = position;
+ double bundleMsgRateTmp = 0;
+ double bundleThroughputTmp = 0;
+ while (hashList.get(i) < endPosition) {
+ bundleMsgRateTmp += hashAndMsgMap.get(hashList.get(i));
+ bundleThroughputTmp += hashAndThroughput.get(hashList.get(i));
+ i++;
+ }
+ assertTrue(bundleMsgRateTmp < loadBalancerNamespaceBundleMaxMsgRate);
+ assertTrue(bundleThroughputTmp < loadBalancerNamespaceBundleMaxBandwidthMbytes * 1024 * 1024);
+ }
+ }
+
+
+ @Test
+ public void testFirstPositionIsOverLoad() {
+ FlowOrQpsEquallyDivideBundleSplitAlgorithm algorithm = new FlowOrQpsEquallyDivideBundleSplitAlgorithm();
+ int loadBalancerNamespaceBundleMaxMsgRate = 1010;
+ int loadBalancerNamespaceBundleMaxBandwidthMbytes = 100;
+ int flowOrQpsDifferenceThresholdPercentage = 10;
+ int topicNum = 5;
+
+ List<String> mockTopics = new ArrayList<>();
+ List<Long> topicHashList = new ArrayList<>(topicNum);
+ Map<Long, String> hashAndTopic = new HashMap<>();
+
+ for (int i = 0; i < topicNum; i++) {
+ String topicName = "persistent://test-tenant1/test-namespace1/test-partition-" + i;
+ mockTopics.add(topicName);
+ long hashValue = Hashing.crc32().hashString(topicName, UTF_8).padToLong();
+ topicHashList.add(hashValue);
+ hashAndTopic.put(hashValue, topicName);
+ }
+ Collections.sort(topicHashList);
+
+ Map<String, TopicStatsImpl> topicStatsMap = new HashMap<>();
+
+ long hashValue = topicHashList.get(0);
+ String topicName = hashAndTopic.get(hashValue);
+ TopicStatsImpl topicStats0 = new TopicStatsImpl();
+ topicStats0.msgRateIn = 1000;
+ topicStats0.msgThroughputIn = 1000;
+ topicStats0.msgRateOut = 1000;
+ topicStats0.msgThroughputOut = 1000;
+ topicStatsMap.put(topicName, topicStats0);
+
+ for (int i = 1; i < topicHashList.size(); i++) {
+ hashValue = topicHashList.get(i);
+ topicName = hashAndTopic.get(hashValue);
+ TopicStatsImpl topicStats = new TopicStatsImpl();
+ topicStats.msgRateIn = 24.5;
+ topicStats.msgThroughputIn = 1000;
+ topicStats.msgRateOut = 25;
+ topicStats.msgThroughputOut = 1000;
+ topicStatsMap.put(topicName, topicStats);
+ }
+
+ // -- do test
+ NamespaceService mockNamespaceService = mock(NamespaceService.class);
+ NamespaceBundle mockNamespaceBundle = mock(NamespaceBundle.class);
+ doReturn(CompletableFuture.completedFuture(mockTopics))
+ .when(mockNamespaceService).getOwnedTopicListForNamespaceBundle(mockNamespaceBundle);
+ NamespaceBundleFactory mockNamespaceBundleFactory = mock(NamespaceBundleFactory.class);
+ doReturn(mockNamespaceBundleFactory)
+ .when(mockNamespaceBundle).getNamespaceBundleFactory();
+ mockTopics.forEach((topic) -> {
+ long hash = Hashing.crc32().hashString(topic, UTF_8).padToLong();
+ doReturn(hash)
+ .when(mockNamespaceBundleFactory).getLongHashCode(topic);
+ });
+ List<Long> splitPositions = algorithm.getSplitBoundary(new FlowOrQpsEquallyDivideBundleSplitOption(mockNamespaceService, mockNamespaceBundle,
+ null, topicStatsMap, loadBalancerNamespaceBundleMaxMsgRate,
+ loadBalancerNamespaceBundleMaxBandwidthMbytes, flowOrQpsDifferenceThresholdPercentage)).join();
+
+ long splitStart = topicHashList.get(0);
+ long splitEnd = topicHashList.get(1);
+ long splitMiddle = splitStart + (splitEnd - splitStart) / 2 + 1;
+ assertTrue(splitPositions.get(0) == splitMiddle);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
index a88f8081aef..75f147e2829 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/NamespaceBundleSplitAlgorithmTest.java
@@ -19,12 +19,11 @@
package org.apache.pulsar.common.naming;
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_NAME;
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE;
-import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm.*;
+
public class NamespaceBundleSplitAlgorithmTest {
@@ -40,5 +39,7 @@ public class NamespaceBundleSplitAlgorithmTest {
Assert.assertTrue(topicCountEquallyDivide instanceof TopicCountEquallyDivideBundleSplitAlgorithm);
NamespaceBundleSplitAlgorithm specifiedTopicCountEquallyDivide = NamespaceBundleSplitAlgorithm.of(SPECIFIED_POSITIONS_DIVIDE);
Assert.assertTrue(specifiedTopicCountEquallyDivide instanceof SpecifiedPositionsBundleSplitAlgorithm);
+ NamespaceBundleSplitAlgorithm flowOrQPSEquallyDivide = NamespaceBundleSplitAlgorithm.of(FLOW_OR_QPS_EQUALLY_DIVIDE);
+ Assert.assertTrue(flowOrQPSEquallyDivide instanceof FlowOrQpsEquallyDivideBundleSplitAlgorithm);
}
}
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index c89bb152fc1..323b47a2359 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -906,7 +906,8 @@ public class CmdNamespaces extends CmdBase {
@Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split "
+ "namespace bundle. Valid options are: [range_equally_divide, topic_count_equally_divide, "
- + "specified_positions_divide]. Use broker side config if absent", required = false)
+ + "specified_positions_divide, flow_or_qps_equally_divide]. Use broker side config if absent"
+ , required = false)
private String splitAlgorithmName;
@Parameter(names = { "--split-boundaries",