You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/16 07:23:06 UTC

[pulsar] branch branch-2.11 updated: fix: bundle-data metadata leak because of bundlestats was not clean (#17095)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 28f53b0214c fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
28f53b0214c is described below

commit 28f53b0214c3af8a8ea3ee58ca7d61e60831819f
Author: Lei Zhiyuan <le...@gmail.com>
AuthorDate: Tue Aug 16 10:53:31 2022 +0800

    fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
    
    Co-authored-by: zhiyuanlei <zh...@tencent.com>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +-
 .../broker/namespace/NamespaceServiceTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index fe6328dd102..4d8f49be965 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -649,7 +649,9 @@ public abstract class NamespacesBase extends AdminResource {
                                         deleteTopicsFuture = FutureUtil.waitForAll(futures);
                                     }
                                     return deleteTopicsFuture.thenCompose(
-                                            ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
+                                            ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle))
+                                            .thenRun(() -> pulsar().getBrokerService().getBundleStats()
+                                                    .remove(bundle.toString()));
                                 });
                     });
                 });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 14fef03a30d..9f03c026225 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -721,6 +721,64 @@ public class NamespaceServiceTest extends BrokerTestBase {
         });
     }
 
+    @Test
+    public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
+        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+        final String namespace = "prop/ns-abc";
+        final String topic1 = "persistent://" + namespace + "/topic1";
+        final String topic2 = "persistent://" + namespace + "/topic2";
+
+        // configure broker with ModularLoadManager
+        conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        conf.setForceDeleteNamespaceAllowed(true);
+        restartBroker();
+
+        LoadManager loadManager = spy(pulsar.getLoadManager().get());
+        Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
+        loadManagerField.setAccessible(true);
+        doReturn(true).when(loadManager).isCentralized();
+        loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+        NamespaceName nsname = NamespaceName.get(namespace);
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name1").subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+                .subscriptionName("my-subscriber-name2").subscribe();
+
+
+        NamespaceBundle bundle =
+                pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1));
+
+        loadManager.getLeastLoaded(bundle);
+
+        //create znode for bundle-data
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff";
+
+        Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get();
+        assertTrue(getResult.isPresent());
+
+        //delete namespace which will remove bundle and load
+        pulsar.getAdminClient().namespaces().deleteNamespace(nsname.toString(),true);
+
+        TimeUnit.SECONDS.sleep(5);
+
+        // update broker bundle report to zk
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        getResult = pulsar.getLocalMetadataStore().get(path).get();
+        assertFalse(getResult.isPresent());
+
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {