You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 02:06:09 UTC

[pulsar] branch master updated: [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c67ded8f85 [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)
5c67ded8f85 is described below

commit 5c67ded8f858c54026ba69bd64854f885b55a5be
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Thu Sep 8 10:05:59 2022 +0800

    [fix][flaky-test]NamespaceServiceTest.flaky/testModularLoadManagerRemoveBundleAndLoad (#17487)
---
 .../broker/namespace/NamespaceServiceTest.java     | 68 ++++++++++++++++++----
 1 file changed, 58 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index cd615f6a0ec..03260aee46d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -77,6 +78,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
@@ -85,6 +87,7 @@ import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.awaitility.Awaitility;
 import org.mockito.stubbing.Answer;
+import org.powermock.reflect.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -746,19 +749,18 @@ public class NamespaceServiceTest extends BrokerTestBase {
     public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
         final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
         final String namespace = "prop/ns-abc";
+        final String bundleName = namespace + "/0x00000000_0xffffffff";
         final String topic1 = "persistent://" + namespace + "/topic1";
         final String topic2 = "persistent://" + namespace + "/topic2";
 
         // configure broker with ModularLoadManager
         conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
         conf.setForceDeleteNamespaceAllowed(true);
+        // Make sure LoadReportUpdaterTask has a 100% chance to write ZK.
+        conf.setLoadBalancerReportUpdateMaxIntervalMinutes(-1);
         restartBroker();
 
-        LoadManager loadManager = spy(pulsar.getLoadManager().get());
-        Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
-        loadManagerField.setAccessible(true);
-        doReturn(true).when(loadManager).isCentralized();
-        loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager));
+        LoadManager loadManager = pulsar.getLoadManager().get();
         NamespaceName nsname = NamespaceName.get(namespace);
 
         @Cleanup
@@ -778,10 +780,9 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
         //create znode for bundle-data
         pulsar.getBrokerService().updateRates();
-        loadManager.writeLoadReportOnZookeeper();
-        loadManager.writeResourceQuotasToZooKeeper();
 
-        String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff";
+        waitResourceDataUpdateToZK(loadManager);
+        String path = BUNDLE_DATA_PATH + "/" + bundleName;
 
         Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get();
         assertTrue(getResult.isPresent());
@@ -792,12 +793,59 @@ public class NamespaceServiceTest extends BrokerTestBase {
         TimeUnit.SECONDS.sleep(5);
 
         // update broker bundle report to zk
-        loadManager.writeLoadReportOnZookeeper();
-        loadManager.writeResourceQuotasToZooKeeper();
+        waitResourceDataUpdateToZK(loadManager);
 
         getResult = pulsar.getLocalMetadataStore().get(path).get();
         assertFalse(getResult.isPresent());
+    }
+
+    /**
+     * 1. Manually trigger "LoadReportUpdaterTask"
+     * 2. Registry another new zk-node-listener "waitForBrokerChangeNotice".
+     * 3. Wait "waitForBrokerChangeNotice" is done, this task will be executed after
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}, because it is registry later than
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}. So if "waitForBrokerChangeNotice" is done
+     *    we can guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} is done. At this time
+     *    we still could not guarantee {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has
+     *    finished all things, because there has a async task be submitted to "ModularLoadManagerImpl.scheduler" by
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)}.
+     * 4. Submit a new task to "scheduler"(it is a singleton thread executor).
+     * 5. Wait the new task done, if the new task done, we can guarantee
+     *    {@link ModularLoadManagerImpl#handleDataNotification(Notification)} has finished all things.
+     * 6. Manually trigger "LoadResourceQuotaUpdaterTask".
+     */
+    private void waitResourceDataUpdateToZK(LoadManager loadManager) throws Exception {
+        CompletableFuture<Void> waitForBrokerChangeNotice = registryBrokerDataChangeNotice();
+        // Manually trigger "LoadReportUpdaterTask"
+        loadManager.writeLoadReportOnZookeeper();
+        waitForBrokerChangeNotice.join();
+        // Wait until "ModularLoadManager" completes processing the ZK notification.
+        ModularLoadManagerWrapper modularLoadManagerWrapper = (ModularLoadManagerWrapper) loadManager;
+        ModularLoadManagerImpl modularLoadManager = (ModularLoadManagerImpl) modularLoadManagerWrapper.getLoadManager();
+        ScheduledExecutorService scheduler = Whitebox.getInternalState(modularLoadManager, "scheduler");
+        CompletableFuture<Void> waitForNoticeHandleFinishByLoadManager = new CompletableFuture<>();
+        scheduler.execute(() -> {
+            waitForNoticeHandleFinishByLoadManager.complete(null);
+        });
+        waitForNoticeHandleFinishByLoadManager.join();
+        // Manually trigger "LoadResourceQuotaUpdaterTask"
+        loadManager.writeResourceQuotasToZooKeeper();
+    }
 
+    public CompletableFuture<Void> registryBrokerDataChangeNotice() {
+        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        String lookupServiceAddress = pulsar.getAdvertisedAddress() + ":"
+                + (conf.getWebServicePort().isPresent() ? conf.getWebServicePort().get()
+                : conf.getWebServicePortTls().get());
+        String brokerDataPath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress;
+        pulsar.getLocalMetadataStore().registerListener(notice -> {
+            if (brokerDataPath.equals(notice.getPath())){
+                if (!completableFuture.isDone()) {
+                    completableFuture.complete(null);
+                }
+            }
+        });
+        return completableFuture;
     }
 
     @SuppressWarnings("unchecked")