You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/16 04:36:00 UTC

[pulsar] branch branch-2.11 updated: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 7ab000b5e97 [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)
7ab000b5e97 is described below

commit 7ab000b5e970cea57abc1f25638448d6e0313b99
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Mon Oct 16 12:35:52 2023 +0800

    [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)
---
 pulsar-metadata/pom.xml                            |   2 -
 .../PulsarLedgerUnderreplicationManager.java       |  76 +++--
 .../replication/AuditorLedgerCheckerTest.java      | 312 +++++++++++++++++++++
 .../LedgerUnderreplicationManagerTest.java         |  19 +-
 4 files changed, 379 insertions(+), 30 deletions(-)

diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 5181f01a55a..099f9bf90ec 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -57,7 +57,6 @@
       </exclusions>
     </dependency>
 
-    <!-- zookeeper server -->
     <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
@@ -85,7 +84,6 @@
       </exclusions>
     </dependency>
 
-    <!-- zookeeper server -->
     <dependency>
       <groupId>org.xerial.snappy</groupId>
       <artifactId>snappy-java</artifactId>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index eeedf54f3bb..11f6f2f7ddc 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -105,14 +105,17 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
     private final String urLockPath;
     private final String layoutPath;
     private final String lostBookieRecoveryDelayPath;
+    private final String replicationDisablePath;
     private final String checkAllLedgersCtimePath;
     private final String placementPolicyCheckCtimePath;
     private final String replicasCheckCtimePath;
 
     private final MetadataStoreExtended store;
 
-    private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
-    private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;
+    private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> replicationEnabledCallbacks =
+            new ArrayList<>();
+    private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> lostBookieRecoveryDelayCallbacks =
+            new ArrayList<>();
 
     private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger {
         PulsarUnderreplicatedLedger(long ledgerId) {
@@ -139,6 +142,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
         urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
         urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
         lostBookieRecoveryDelayPath = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
+        replicationDisablePath = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
         checkAllLedgersCtimePath = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
         placementPolicyCheckCtimePath = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
         replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
@@ -232,17 +236,34 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
             synchronized (this) {
                 // Notify that there were some changes on the under-replicated z-nodes
                 notifyAll();
-
-                if (n.getType() == NotificationType.Deleted) {
-                    if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) {
-                        log.info("LedgerReplication is enabled externally through MetadataStore, "
-                                + "since DISABLE_NODE ZNode is deleted");
-                        if (replicationEnabledListener != null) {
-                            replicationEnabledListener.operationComplete(0, null);
+                if (lostBookieRecoveryDelayPath.equals(n.getPath())) {
+                    final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
+                    synchronized (lostBookieRecoveryDelayCallbacks) {
+                        callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks);
+                        lostBookieRecoveryDelayCallbacks.clear();
+                    }
+                    for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
+                        try {
+                            callback.operationComplete(0, null);
+                        } catch (Exception e) {
+                            log.warn("lostBookieRecoveryDelayCallbacks handle error", e);
                         }
-                    } else if (n.getPath().equals(lostBookieRecoveryDelayPath)) {
-                        if (lostBookieRecoveryDelayListener != null) {
-                            lostBookieRecoveryDelayListener.operationComplete(0, null);
+                    }
+                    return;
+                }
+                if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) {
+                    log.info("LedgerReplication is enabled externally through MetadataStore, "
+                            + "since DISABLE_NODE ZNode is deleted");
+                    final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
+                    synchronized (replicationEnabledCallbacks) {
+                        callbackList = new ArrayList<>(replicationEnabledCallbacks);
+                        replicationEnabledCallbacks.clear();
+                    }
+                    for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
+                        try {
+                            callback.operationComplete(0, null);
+                        } catch (Exception e) {
+                            log.warn("replicationEnabledCallbacks handle error", e);
                         }
                     }
                 }
@@ -678,10 +699,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
             log.debug("disableLedegerReplication()");
         }
         try {
-            String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
-            store.put(path, "".getBytes(UTF_8), Optional.of(-1L)).get();
+            store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L))
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             log.info("Auto ledger re-replication is disabled!");
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Exception while stopping auto ledger re-replication", ee);
             throw new ReplicationException.UnavailableException(
                     "Exception while stopping auto ledger re-replication", ee);
@@ -699,9 +720,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
             log.debug("enableLedegerReplication()");
         }
         try {
-            store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty()).get();
+            store.delete(replicationDisablePath, Optional.empty())
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
             log.info("Resuming automatic ledger re-replication");
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Exception while resuming ledger replication", ee);
             throw new ReplicationException.UnavailableException(
                     "Exception while resuming auto ledger re-replication", ee);
@@ -719,8 +741,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
             log.debug("isLedgerReplicationEnabled()");
         }
         try {
-            return !store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get();
-        } catch (ExecutionException ee) {
+            return !store.exists(replicationDisablePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while checking the state of "
                     + "ledger re-replication", ee);
             throw new ReplicationException.UnavailableException(
@@ -738,19 +761,18 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
         if (log.isDebugEnabled()) {
             log.debug("notifyLedgerReplicationEnabled()");
         }
-
-        synchronized (this) {
-            replicationEnabledListener = cb;
+        synchronized (replicationEnabledCallbacks) {
+            replicationEnabledCallbacks.add(cb);
         }
-
         try {
-            if (!store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get()) {
+            if (!store.exists(replicationDisablePath)
+                    .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
                 log.info("LedgerReplication is enabled externally through metadata store, "
                         + "since DISABLE_NODE node is deleted");
                 cb.operationComplete(0, null);
                 return;
             }
-        } catch (ExecutionException ee) {
+        } catch (ExecutionException | TimeoutException ee) {
             log.error("Error while checking the state of "
                     + "ledger re-replication", ee);
             throw new ReplicationException.UnavailableException(
@@ -833,8 +855,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
     public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws
             ReplicationException.UnavailableException {
         log.debug("notifyLostBookieRecoveryDelayChanged()");
-        synchronized (this) {
-            lostBookieRecoveryDelayListener = cb;
+        synchronized (lostBookieRecoveryDelayCallbacks) {
+            lostBookieRecoveryDelayCallbacks.add(cb);
         }
         try {
             if (!store.exists(lostBookieRecoveryDelayPath).get()) {
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
new file mode 100644
index 00000000000..e9db681691f
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertTrue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests publishing of under replicated ledgers by the Auditor bookie node when
+ * corresponding bookies identifes as not running.
+ */
+public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
+
+    // Depending on the taste, select the amount of logging
+    // by decommenting one of the two lines below
+    // private static final Logger LOG = Logger.getRootLogger();
+    private static final Logger LOG = LoggerFactory
+            .getLogger(AuditorLedgerCheckerTest.class);
+
+    private static final byte[] ledgerPassword = "aaa".getBytes();
+    private Random rng; // Random Number Generator
+
+    private DigestType digestType;
+
+    private String underreplicatedPath;
+    private Map<String, AuditorElector> auditorElectors = new ConcurrentHashMap<>();
+    private ZkLedgerUnderreplicationManager urLedgerMgr;
+    private Set<Long> urLedgerList;
+    private String electionPath;
+
+    private List<Long> ledgerList;
+
+    public AuditorLedgerCheckerTest()
+            throws IOException, KeeperException, InterruptedException,
+            CompatibilityException {
+        this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+    }
+
+    AuditorLedgerCheckerTest(String ledgerManagerFactoryClass)
+            throws IOException, KeeperException, InterruptedException,
+            CompatibilityException {
+        super(3);
+        LOG.info("Running test case using ledger manager : "
+                + ledgerManagerFactoryClass);
+        this.digestType = DigestType.CRC32;
+        // set ledger manager name
+        baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
+        baseClientConf
+                .setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
+    }
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        super.setUp();
+        underreplicatedPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf)
+                + "/underreplication/ledgers";
+        electionPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf)
+                + "/underreplication/auditorelection";
+
+        urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+        urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis());
+        startAuditorElectors();
+        rng = new Random(System.currentTimeMillis()); // Initialize the Random
+        urLedgerList = new HashSet<Long>();
+        ledgerList = new ArrayList<Long>(2);
+        baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        stopAuditorElectors();
+        super.tearDown();
+    }
+
+    private void startAuditorElectors() throws Exception {
+        for (String addr : bookieAddresses().stream().map(Object::toString)
+                .collect(Collectors.toList())) {
+            AuditorElector auditorElector = new AuditorElector(addr, baseConf);
+            auditorElectors.put(addr, auditorElector);
+            auditorElector.start();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Starting Auditor Elector");
+            }
+        }
+    }
+
+    private void stopAuditorElectors() throws Exception {
+        for (AuditorElector auditorElector : auditorElectors.values()) {
+            auditorElector.shutdown();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping Auditor Elector!");
+            }
+        }
+    }
+
+    @Test
+    public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception {
+        // wait for a second so that the initial periodic check finishes
+        Thread.sleep(1000);
+
+        Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+        LedgerHandle lh1 = createAndAddEntriesToLedger();
+        Long ledgerId = lh1.getId();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Created ledger : " + ledgerId);
+        }
+        ledgerList.add(ledgerId);
+        lh1.close();
+
+        final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+                .size());
+
+        int lostBookieRecoveryDelay = 5;
+        // wait for 5 seconds before starting the recovery work when a bookie fails
+        urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+        // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+        String shutdownBookie = shutDownNonAuditorBookie();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Waiting for ledgers to be marked as under replicated");
+        }
+        assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+        assertEquals("under replicated ledgers identified when it was not expected", 0,
+                urLedgerList.size());
+
+        Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+        assertNotSame("auditTask is not supposed to be null", null, auditTask);
+        assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
+                lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+
+        // set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately
+        urLedgerMgr.setLostBookieRecoveryDelay(0);
+        assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
+        assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
+                urLedgerList.size());
+
+        Thread.sleep(100);
+        auditTask = auditorBookiesAuditor.getAuditTask();
+        assertEquals("auditTask is supposed to be null", null, auditTask);
+        assertEquals(
+                "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
+                0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+    }
+
+    private CountDownLatch registerUrLedgerWatcher(int count)
+            throws KeeperException, InterruptedException {
+        final CountDownLatch underReplicaLatch = new CountDownLatch(count);
+        for (Long ledgerId : ledgerList) {
+            Watcher urLedgerWatcher = new ChildWatcher(underReplicaLatch);
+            String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(underreplicatedPath,
+                    ledgerId);
+            zkc.exists(znode, urLedgerWatcher);
+        }
+        return underReplicaLatch;
+    }
+
+    private String shutdownBookie(int bkShutdownIndex) throws Exception {
+        BookieServer bkServer = serverByIndex(bkShutdownIndex);
+        String bookieAddr = bkServer.getBookieId().toString();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Shutting down bookie:" + bookieAddr);
+        }
+        killBookie(bkShutdownIndex);
+        auditorElectors.get(bookieAddr).shutdown();
+        auditorElectors.remove(bookieAddr);
+        return bookieAddr;
+    }
+
+    private LedgerHandle createAndAddEntriesToLedger() throws BKException,
+            InterruptedException {
+        int numEntriesToWrite = 100;
+        // Create a ledger
+        LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
+        LOG.info("Ledger ID: " + lh.getId());
+        addEntry(numEntriesToWrite, lh);
+        return lh;
+    }
+
+    private void addEntry(int numEntriesToWrite, LedgerHandle lh)
+            throws InterruptedException, BKException {
+        final CountDownLatch completeLatch = new CountDownLatch(numEntriesToWrite);
+        final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+
+        for (int i = 0; i < numEntriesToWrite; i++) {
+            ByteBuffer entry = ByteBuffer.allocate(4);
+            entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+            entry.position(0);
+            lh.asyncAddEntry(entry.array(), new AddCallback() {
+                public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
+                    rc.compareAndSet(BKException.Code.OK, rc2);
+                    completeLatch.countDown();
+                }
+            }, null);
+        }
+        completeLatch.await();
+        if (rc.get() != BKException.Code.OK) {
+            throw BKException.create(rc.get());
+        }
+
+    }
+
+    private class ChildWatcher implements Watcher {
+        private final CountDownLatch underReplicaLatch;
+
+        public ChildWatcher(CountDownLatch underReplicaLatch) {
+            this.underReplicaLatch = underReplicaLatch;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info("Received notification for the ledger path : "
+                    + event.getPath());
+            for (Long ledgerId : ledgerList) {
+                if (event.getPath().contains(ledgerId + "")) {
+                    urLedgerList.add(ledgerId);
+                }
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Count down and waiting for next notification");
+            }
+            // count down and waiting for next notification
+            underReplicaLatch.countDown();
+        }
+    }
+
+    private BookieServer getAuditorBookie() throws Exception {
+        List<BookieServer> auditors = new LinkedList<BookieServer>();
+        byte[] data = zkc.getData(electionPath, false, null);
+        assertNotNull("Auditor election failed", data);
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieId bookieId = addressByIndex(i);
+            if (new String(data).contains(bookieId + "")) {
+                auditors.add(serverByIndex(i));
+            }
+        }
+        assertEquals("Multiple Bookies acting as Auditor!", 1, auditors
+                .size());
+        return auditors.get(0);
+    }
+
+    private Auditor getAuditorBookiesAuditor() throws Exception {
+        BookieServer auditorBookieServer = getAuditorBookie();
+        String bookieAddr = auditorBookieServer.getBookieId().toString();
+        return auditorElectors.get(bookieAddr).auditor;
+    }
+
+    private String  shutDownNonAuditorBookie() throws Exception {
+        // shutdown bookie which is not an auditor
+        int indexOf = indexOfServer(getAuditorBookie());
+        int bkIndexDownBookie;
+        if (indexOf < lastBookieIndex()) {
+            bkIndexDownBookie = indexOf + 1;
+        } else {
+            bkIndexDownBookie = indexOf - 1;
+        }
+        return shutdownBookie(bkIndexDownBookie);
+    }
+}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 1c8b62642da..115634638ce 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 import lombok.Cleanup;
@@ -613,6 +614,8 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
         final String missingReplica = "localhost:3181";
 
         // disabling replication
+        AtomicInteger callbackCount = new AtomicInteger();
+        lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
         lum.disableLedgerReplication();
         log.info("Disabled Ledeger Replication");
 
@@ -630,6 +633,7 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
         } catch (TimeoutException te) {
             // expected behaviour, as the replication is disabled
         }
+        assertEquals(callbackCount.get(), 1, "Notify callback times mismatch");
     }
 
     /**
@@ -650,7 +654,8 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
             log.debug("Unexpected exception while marking urLedger", e);
             fail("Unexpected exception while marking urLedger" + e.getMessage());
         }
-
+        AtomicInteger callbackCount = new AtomicInteger();
+        lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
         // disabling replication
         lum.disableLedgerReplication();
         log.debug("Disabled Ledeger Replication");
@@ -687,6 +692,7 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
             znodeLatch.await(5, TimeUnit.SECONDS);
             log.debug("Enabled Ledeger Replication");
             assertEquals(znodeLatch.getCount(), 0, "Failed to disable ledger replication!");
+            assertEquals(callbackCount.get(), 2, "Notify callback times mismatch");
         } finally {
             thread1.interrupt();
         }
@@ -748,6 +754,17 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
         assertEquals(underReplicaMgr1.getReplicasCheckCTime(), curTime);
     }
 
+    @Test(timeOut = 60000, dataProvider = "impl")
+    public void testLostBookieRecoveryDelay(String provider, Supplier<String> urlSupplier) throws Exception {
+        methodSetup(urlSupplier);
+
+        AtomicInteger callbackCount = new AtomicInteger();
+        lum.notifyLostBookieRecoveryDelayChanged((rc, result) -> callbackCount.incrementAndGet());
+        // disabling replication
+        lum.setLostBookieRecoveryDelay(10);
+        Awaitility.await().until(() -> callbackCount.get() == 2);
+    }
+
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica) throws Exception {
         Long ledgerA = 0xfeadeefdacL;
         String znodeA = getUrLedgerZnode(ledgerA);