You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 02:06:09 UTC
[pulsar] branch master updated: [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5c67ded8f85 [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)
5c67ded8f85 is described below
commit 5c67ded8f858c54026ba69bd64854f885b55a5be
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Sep 8 10:05:59 2022 +0800
[fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)
---
.../broker/namespace/NamespaceServiceTest.java | 68 ++++++++++++++++++----
1 file changed, 58 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index cd615f6a0ec..03260aee46d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -77,6 +78,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -85,6 +87,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -746,19 +749,18 @@ public class NamespaceServiceTest extends BrokerTestBase {
public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
final String namespace = "prop/ns-abc";
+ final String bundleName = namespace + "/0x00000000_0xffffffff";
final String topic1 = "persistent://" + namespace + "/topic1";
final String topic2 = "persistent://" + namespace + "/topic2";
// configure broker with ModularLoadManager
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
conf.setForceDeleteNamespaceAllowed(true);
+ // Make sure LoadReportUpdaterTask has a 100% chance to write ZK.
+ conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1);
restartBroker();
- LoadManager loadManager = spy(pulsar.getLoadManager().get());
- Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
- loadManagerField.setAccessible(true);
- doReturn(true).when(loadManager).isCentralized();
- loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+ LoadManager loadManager = pulsar.getLoadManager().get();
NamespaceName nsname = NamespaceName.get(namespace);
@Cleanup
@@ -778,10 +780,9 @@ public class NamespaceServiceTest extends BrokerTestBase {
//create znode for bundle-data
pulsar.getBrokerService().updateRates();
- loadManager.writeLoadReportOnZookeeper();
- loadManager.writeResourceQuotasToZooKeeper();
- String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff";
+ waitResourceDataUpdateToZK(loadManager);
+ String path = BUNDLE_DATA_PATH + "/" + bundleName;
Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get();
assertTrue(getResult.isPresent());
@@ -792,12 +793,59 @@ public class NamespaceServiceTest extends BrokerTestBase {
TimeUnit.SECONDS.sleep(5);
// update broker bundle report to zk
- loadManager.writeLoadReportOnZookeeper();
- loadManager.writeResourceQuotasToZooKeeper();
+ waitResourceDataUpdateToZK(loadManager);
getResult = pulsar.getLocalMetadataStore().get(path).get();
assertFalse(getResult.isPresent());
+ }
+
+ /**
+ * 1. Manually trigger "LoadReportUpdaterTask"
+ * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
+ * 3. Wait "waitForBrokerChangeNotice" is done, this task will be executed after
+ * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}, because it is registry later than
+ * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. So if "waitForBrokerChangeNotice" is done
+ * we can guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} is done. At this time
+ * we still could not guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has
+ * finished all things, because there has a async task be submitted to "ModularLoadManagerImpl.scheduler" by
+ * {@link ModularLoadManagerImpl#handleDataNotification(Notification)}.
+ * 4. Submit a new task to "scheduler"(it is a singleton thread executor).
+ * 5. Wait the new task done, if the new task done, we can guarantee
+ * {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has finished all things.
+ * 6. Manually trigger "LoadResourceQuotaUpdaterTask".
+ */
+ private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exception {
+ CompletableFuture<Void> waitForBrokerChangeNotice = registryBrokerDataChangeNotice();
+ // Manually trigger "LoadReportUpdaterTask"
+ loadManager.writeLoadReportOnZookeeper();
+ waitForBrokerChangeNotice.join();
+ // Wait until "ModularLoadManager" completes processing the ZK notification.
+ ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager;
+ ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();
+ ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler");
+ CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>();
+ scheduler.execute(() -> {
+ waitForNoticeHandleFinishByLoadManager.complete(null);
+ });
+ waitForNoticeHandleFinishByLoadManager.join();
+ // Manually trigger "LoadResourceQuotaUpdaterTask"
+ loadManager.writeResourceQuotasToZooKeeper();
+ }
+ public CompletableFuture<Void> registryBrokerDataChangeNotice() {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+ + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
+ : conf.getWebServicePortTls().get());
+ String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+ pulsar.getLocalMetadataStore().registerListener(notice -> {
+ if (brokerDataPath.equals(notice.getPath())){
+ if (!completableFuture.isDone()) {
+ completableFuture.complete(null);
+ }
+ }
+ });
+ return completableFuture;
}
@SuppressWarnings("unchecked")