You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/16 07:23:06 UTC
[pulsar] branch branch-2.11 updated: fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 28f53b0214c fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
28f53b0214c is described below
commit 28f53b0214c3af8a8ea3ee58ca7d61e60831819f
Author: Lei Zhiyuan <le...@gmail.com>
AuthorDate: Tue Aug 16 10:53:31 2022 +0800
fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
Co-authored-by: zhiyuanlei <zh...@tencent.com>
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 4 +-
.../broker/namespace/NamespaceServiceTest.java | 58 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index fe6328dd102..4d8f49be965 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -649,7 +649,9 @@ public abstract class NamespacesBase extends AdminResource {
deleteTopicsFuture = FutureUtil.waitForAll(futures);
}
return deleteTopicsFuture.thenCompose(
- ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
+ ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle))
+ .thenRun(() -> pulsar().getBrokerService().getBundleStats()
+ .remove(bundle.toString()));
});
});
});
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 14fef03a30d..9f03c026225 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -721,6 +721,64 @@ public class NamespaceServiceTest extends BrokerTestBase {
});
}
+ @Test
+ public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
+ final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+ final String namespace = "prop/ns-abc";
+ final String topic1 = "persistent://" + namespace + "/topic1";
+ final String topic2 = "persistent://" + namespace + "/topic2";
+
+ // configure broker with ModularLoadManager
+ conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ conf.setForceDeleteNamespaceAllowed(true);
+ restartBroker();
+
+ LoadManager loadManager = spy(pulsar.getLoadManager().get());
+ Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
+ loadManagerField.setAccessible(true);
+ doReturn(true).when(loadManager).isCentralized();
+ loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+ NamespaceName nsname = NamespaceName.get(namespace);
+
+ @Cleanup
+ PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+ .subscriptionName("my-subscriber-name1").subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+ .subscriptionName("my-subscriber-name2").subscribe();
+
+
+ NamespaceBundle bundle =
+ pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1));
+
+ loadManager.getLeastLoaded(bundle);
+
+ //create znode for bundle-data
+ pulsar.getBrokerService().updateRates();
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff";
+
+ Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get();
+ assertTrue(getResult.isPresent());
+
+ //delete namespace which will remove bundle and load
+ pulsar.getAdminClient().namespaces().deleteNamespace(nsname.toString(),true);
+
+ TimeUnit.SECONDS.sleep(5);
+
+ // update broker bundle report to zk
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ getResult = pulsar.getLocalMetadataStore().get(path).get();
+ assertFalse(getResult.isPresent());
+
+ }
+
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {