You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/05/09 06:20:51 UTC
[pulsar] branch master updated: [broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f5019c81d26 [broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)
f5019c81d26 is described below
commit f5019c81d2680b816eac768b1896333e04ab4214
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon May 9 14:20:42 2022 +0800
[broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)
* clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* update unit test clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* update unit test clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
* change the unit test function argument lists
* fix unit test import list
Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 12 ++++
.../broker/namespace/NamespaceServiceTest.java | 70 +++++++++++++++++++++-
2 files changed, 80 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 19e00000bc9..9b618346e61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -534,6 +534,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// load management decisions may be made.
private void updateBundleData() {
final Map<String, BundleData> bundleData = loadData.getBundleData();
+ final Set<String> activeBundles = new HashSet<>();
// Iterate over the broker data.
for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
final String broker = brokerEntry.getKey();
@@ -545,6 +546,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
for (Map.Entry<String, NamespaceBundleStats> entry : statsMap.entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
+ activeBundles.add(bundle);
if (bundleData.containsKey(bundle)) {
// If we recognize the bundle, add these stats as a new sample.
bundleData.get(bundle).update(stats);
@@ -557,6 +559,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
}
}
+ //Remove not active bundle from loadData
+ for (String bundle : bundleData.keySet()) {
+ if (!activeBundles.contains(bundle)){
+ bundleData.remove(bundle);
+ if (pulsar.getLeaderElectionService().isLeader()){
+ deleteBundleDataFromMetadataStore(bundle);
+ }
+ }
+ }
+
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
synchronized (preallocatedBundleData) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 5c0ffebbeec..36278f57dbd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -43,21 +43,29 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -67,6 +75,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -655,6 +664,63 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}
+ @Test
+ public void testModularLoadManagerRemoveInactiveBundleFromLoadData() throws Exception {
+ final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+ final String namespace = "pulsar/test/ns1";
+ final String topic1 = "persistent://" + namespace + "/topic1";
+ final String topic2 = "persistent://" + namespace + "/topic2";
+
+ // configure broker with ModularLoadManager
+ conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ restartBroker();
+
+ LoadManager loadManager = spy(pulsar.getLoadManager().get());
+ Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
+ loadManagerField.setAccessible(true);
+ doReturn(true).when(loadManager).isCentralized();
+ SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+ Optional<ResourceUnit> res = Optional.of(resourceUnit);
+ doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
+ loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+ .subscriptionName("my-subscriber-name1").subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+ .subscriptionName("my-subscriber-name2").subscribe();
+
+ //create znode for bundle-data
+ pulsar.getBrokerService().updateRates();
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ //split bundle
+ NamespaceName nsname = NamespaceName.get(namespace);
+ NamespaceBundles bundles = pulsar.getNamespaceService().getNamespaceBundleFactory().getBundles(nsname);
+ NamespaceBundle oldBundle = bundles.findBundle(TopicName.get(topic1));
+ pulsar.getNamespaceService().splitAndOwnBundle(oldBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null).get();
+
+ // update broker bundle report to zk
+ pulsar.getBrokerService().updateRates();
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ Field loadDataFiled = ModularLoadManagerImpl.class.getDeclaredField("loadData");
+ loadDataFiled.setAccessible(true);
+ LoadData loadData = (LoadData)loadDataFiled
+ .get((ModularLoadManagerImpl) ((ModularLoadManagerWrapper) loadManager).getLoadManager());
+ MetadataCache<BundleData> bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
+
+ Awaitility.await().untilAsserted(() -> {
+ assertNull(loadData.getBundleData().get(oldBundle.toString()));
+ assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" + oldBundle.toString()).get());
+ });
+ }
+
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {