You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:16:40 UTC
[pulsar] 07/09: Avoid to infinitely split bundle (#11937)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d02653abd5541a44887c806abf586715f56cb33d
Author: Shoothzj <sh...@gmail.com>
AuthorDate: Wed Sep 8 23:28:13 2021 +0800
Avoid to infinitely split bundle (#11937)
(cherry picked from commit bef3757029eb3e48a490a02871689ab3e6abdfa5)
---
.../loadbalance/impl/BundleSplitterTask.java | 8 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 2 +-
.../loadbalance/impl/BundleSplitterTaskTest.java | 105 +++++++++++++++++++++
.../pulsar/client/api/BrokerServiceLookupTest.java | 15 ++-
4 files changed, 122 insertions(+), 8 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index bb1f990..e81fb50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -42,10 +42,8 @@ public class BundleSplitterTask implements BundleSplitStrategy {
/**
* Construct a BundleSplitterTask.
*
- * @param pulsar
- * Service to construct from.
*/
- public BundleSplitterTask(final PulsarService pulsar) {
+ public BundleSplitterTask() {
bundleCache = new HashSet<>();
}
@@ -74,6 +72,10 @@ public class BundleSplitterTask implements BundleSplitStrategy {
for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
+ if (stats.topics == 1) {
+ log.info("namespace bundle {} only have 1 topic", bundle);
+ continue;
+ }
double totalMessageRate = 0;
double totalMessageThroughput = 0;
// Attempt to consider long-term message data, otherwise effectively ignore.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 5b7867e..5035b00 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -248,7 +248,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
brokerHostUsage = new GenericBrokerHostUsageImpl(pulsar);
}
- bundleSplitStrategy = new BundleSplitterTask(pulsar);
+ bundleSplitStrategy = new BundleSplitterTask();
conf = pulsar.getConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
new file mode 100644
index 0000000..7480989
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.impl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerData;
+import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageMessageData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
+import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * @author hezhangjian
+ */
+@Slf4j
+@Test(groups = "broker")
+public class BundleSplitterTaskTest {
+
+ private LocalBookkeeperEnsemble bkEnsemble;
+
+ private PulsarService pulsar;
+
+ @BeforeMethod
+ void setup() throws Exception {
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+ bkEnsemble.start();
+ // Start broker
+ ServiceConfiguration config = new ServiceConfiguration();
+ config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ config.setClusterName("use");
+ config.setWebServicePort(Optional.of(0));
+ config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+
+ config.setAdvertisedAddress("localhost");
+ config.setBrokerShutdownTimeoutMs(0L);
+ config.setBrokerServicePort(Optional.of(0));
+ config.setBrokerServicePortTls(Optional.of(0));
+ config.setWebServicePortTls(Optional.of(0));
+ pulsar = new PulsarService(config);
+ pulsar.start();
+ }
+
+ @Test
+ public void testSplitTaskWhenTopicJustOne() {
+ final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask();
+ LoadData loadData = new LoadData();
+
+ LocalBrokerData brokerData = new LocalBrokerData();
+ Map<String, NamespaceBundleStats> lastStats = new HashMap<>();
+ final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats();
+ namespaceBundleStats.topics = 1;
+ lastStats.put("ten/ns/0x00000000_0x80000000", namespaceBundleStats);
+ brokerData.setLastStats(lastStats);
+ loadData.getBrokerData().put("broker", new BrokerData(brokerData));
+
+ BundleData bundleData = new BundleData();
+ TimeAverageMessageData averageMessageData = new TimeAverageMessageData();
+ averageMessageData.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate());
+ averageMessageData.setMsgRateOut(1);
+ bundleData.setLongTermData(averageMessageData);
+ loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", bundleData);
+
+ final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+ Assert.assertEquals(bundlesToSplit.size(), 0);
+ }
+
+
+ @AfterMethod(alwaysRun = true)
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index cb349bf..36ff448 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -1019,6 +1019,13 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
Consumer<byte[]> consumer1 = pulsarClient2.newConsumer().topic(topic1)
.subscriptionName("my-subscriber-name").subscribe();
+ // there should be more than one topic to trigger split
+ final String topic2 = "persistent://" + namespace + "/topic2";
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+ .subscriptionName("my-subscriber-name")
+ .subscribe();
+
// (4) Broker-1 will own topic-1
final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
@@ -1054,7 +1061,7 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
updateAllMethod.invoke(loadManager);
conf2.setLoadBalancerAutoBundleSplitEnabled(true);
conf2.setLoadBalancerAutoUnloadSplitBundlesEnabled(true);
- conf2.setLoadBalancerNamespaceBundleMaxTopics(0);
+ conf2.setLoadBalancerNamespaceBundleMaxTopics(1);
loadManager.checkNamespaceBundleSplit();
// (6) Broker-2 should get the watch and update bundle cache
@@ -1063,15 +1070,15 @@ public class BrokerServiceLookupTest extends ProducerConsumerBase {
});
// (7) Make lookup request again to Broker-2 which should succeed.
- final String topic2 = "persistent://" + namespace + "/topic2";
+ final String topic3 = "persistent://" + namespace + "/topic3";
@Cleanup
- Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
+ Consumer<byte[]> consumer3 = pulsarClient2.newConsumer().topic(topic3)
.subscriptionName("my-subscriber-name")
.subscribe();
Awaitility.await().untilAsserted(() -> {
NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
- .getBundle(TopicName.get(topic2));
+ .getBundle(TopicName.get(topic3));
assertNotEquals(bundleInBroker1AfterSplit.toString(), unsplitBundle);
});
} finally {