You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/05/09 06:20:51 UTC

[pulsar] branch master updated: [broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f5019c81d26 [broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)
f5019c81d26 is described below

commit f5019c81d2680b816eac768b1896333e04ab4214
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon May 9 14:20:42 2022 +0800

    [broker] clean inactive bundle from bundleData in loadData and bundlesCache (#13974)
    
    * clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * update unit test clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * update unit test clean inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * inactive bundle from bundleData in loadData and bundlesCache after the bundle be split or unload
    
    * change the unit test function argument lists
    
    * fix unit test import list
    
    Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 12 ++++
 .../broker/namespace/NamespaceServiceTest.java     | 70 +++++++++++++++++++++-
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 19e00000bc9..9b618346e61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -534,6 +534,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
     // load management decisions may be made.
     private void updateBundleData() {
         final Map<String, BundleData> bundleData = loadData.getBundleData();
+        final Set<String> activeBundles = new HashSet<>();
         // Iterate over the broker data.
         for (Map.Entry<String, BrokerData> brokerEntry : loadData.getBrokerData().entrySet()) {
             final String broker = brokerEntry.getKey();
@@ -545,6 +546,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
             for (Map.Entry<String, NamespaceBundleStats> entry : statsMap.entrySet()) {
                 final String bundle = entry.getKey();
                 final NamespaceBundleStats stats = entry.getValue();
+                activeBundles.add(bundle);
                 if (bundleData.containsKey(bundle)) {
                     // If we recognize the bundle, add these stats as a new sample.
                     bundleData.get(bundle).update(stats);
@@ -557,6 +559,16 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
                 }
             }
 
+            //Remove not active bundle from loadData
+            for (String bundle : bundleData.keySet()) {
+                if (!activeBundles.contains(bundle)){
+                    bundleData.remove(bundle);
+                    if (pulsar.getLeaderElectionService().isLeader()){
+                        deleteBundleDataFromMetadataStore(bundle);
+                    }
+                }
+            }
+
             // Remove all loaded bundles from the preallocated maps.
             final Map<String, BundleData> preallocatedBundleData = brokerData.getPreallocatedBundleData();
             synchronized (preallocatedBundleData) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 5c0ffebbeec..36278f57dbd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -43,21 +43,29 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+
+import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -67,6 +75,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -655,6 +664,63 @@ public class NamespaceServiceTest extends BrokerTestBase {
                         NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
     }
 
+    @Test
+    public void testModularLoadManagerRemoveInactiveBundleFromLoadData() throws Exception {
+        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+        final String namespace = "pulsar/test/ns1";
+        final String topic1 = "persistent://" + namespace + "/topic1";
+        final String topic2 = "persistent://" + namespace + "/topic2";
+
+        // configure broker with ModularLoadManager
+        conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        restartBroker();
+
+        LoadManager loadManager = spy(pulsar.getLoadManager().get());
+        Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
+        loadManagerField.setAccessible(true);
+        doReturn(true).when(loadManager).isCentralized();
+        SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+        Optional<ResourceUnit> res = Optional.of(resourceUnit);
+        doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
+        loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name1").subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+                .subscriptionName("my-subscriber-name2").subscribe();
+
+        //create znode for bundle-data
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        //split bundle
+        NamespaceName nsname = NamespaceName.get(namespace);
+        NamespaceBundles bundles = pulsar.getNamespaceService().getNamespaceBundleFactory().getBundles(nsname);
+        NamespaceBundle oldBundle = bundles.findBundle(TopicName.get(topic1));
+        pulsar.getNamespaceService().splitAndOwnBundle(oldBundle, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO, null).get();
+
+        // update broker bundle report to zk
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        Field loadDataFiled = ModularLoadManagerImpl.class.getDeclaredField("loadData");
+        loadDataFiled.setAccessible(true);
+        LoadData loadData = (LoadData)loadDataFiled
+                .get((ModularLoadManagerImpl) ((ModularLoadManagerWrapper) loadManager).getLoadManager());
+        MetadataCache<BundleData> bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
+
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(loadData.getBundleData().get(oldBundle.toString()));
+            assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" + oldBundle.toString()).get());
+        });
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {