You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/10/16 04:36:00 UTC
[pulsar] branch branch-2.11 updated: [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 7ab000b5e97 [fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)
7ab000b5e97 is described below
commit 7ab000b5e970cea57abc1f25638448d6e0313b99
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Mon Oct 16 12:35:52 2023 +0800
[fix] [auto-recovery] Fix PulsarLedgerUnderreplicationManager notify problem. (#21318)
---
pulsar-metadata/pom.xml | 2 -
.../PulsarLedgerUnderreplicationManager.java | 76 +++--
.../replication/AuditorLedgerCheckerTest.java | 312 +++++++++++++++++++++
.../LedgerUnderreplicationManagerTest.java | 19 +-
4 files changed, 379 insertions(+), 30 deletions(-)
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index 5181f01a55a..099f9bf90ec 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -57,7 +57,6 @@
</exclusions>
</dependency>
- <!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
@@ -85,7 +84,6 @@
</exclusions>
</dependency>
- <!-- zookeeper server -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
index eeedf54f3bb..11f6f2f7ddc 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java
@@ -105,14 +105,17 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
private final String urLockPath;
private final String layoutPath;
private final String lostBookieRecoveryDelayPath;
+ private final String replicationDisablePath;
private final String checkAllLedgersCtimePath;
private final String placementPolicyCheckCtimePath;
private final String replicasCheckCtimePath;
private final MetadataStoreExtended store;
- private BookkeeperInternalCallbacks.GenericCallback<Void> replicationEnabledListener;
- private BookkeeperInternalCallbacks.GenericCallback<Void> lostBookieRecoveryDelayListener;
+ private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> replicationEnabledCallbacks =
+ new ArrayList<>();
+ private final List<BookkeeperInternalCallbacks.GenericCallback<Void>> lostBookieRecoveryDelayCallbacks =
+ new ArrayList<>();
private static class PulsarUnderreplicatedLedger extends UnderreplicatedLedger {
PulsarUnderreplicatedLedger(long ledgerId) {
@@ -139,6 +142,7 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
urLedgerPath = basePath + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;
urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
lostBookieRecoveryDelayPath = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
+ replicationDisablePath = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
checkAllLedgersCtimePath = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
placementPolicyCheckCtimePath = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
replicasCheckCtimePath = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
@@ -232,17 +236,34 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
synchronized (this) {
// Notify that there were some changes on the under-replicated z-nodes
notifyAll();
-
- if (n.getType() == NotificationType.Deleted) {
- if (n.getPath().equals(basePath + '/' + BookKeeperConstants.DISABLE_NODE)) {
- log.info("LedgerReplication is enabled externally through MetadataStore, "
- + "since DISABLE_NODE ZNode is deleted");
- if (replicationEnabledListener != null) {
- replicationEnabledListener.operationComplete(0, null);
+ if (lostBookieRecoveryDelayPath.equals(n.getPath())) {
+ final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
+ synchronized (lostBookieRecoveryDelayCallbacks) {
+ callbackList = new ArrayList<>(lostBookieRecoveryDelayCallbacks);
+ lostBookieRecoveryDelayCallbacks.clear();
+ }
+ for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
+ try {
+ callback.operationComplete(0, null);
+ } catch (Exception e) {
+ log.warn("lostBookieRecoveryDelayCallbacks handle error", e);
}
- } else if (n.getPath().equals(lostBookieRecoveryDelayPath)) {
- if (lostBookieRecoveryDelayListener != null) {
- lostBookieRecoveryDelayListener.operationComplete(0, null);
+ }
+ return;
+ }
+ if (replicationDisablePath.equals(n.getPath()) && n.getType() == NotificationType.Deleted) {
+ log.info("LedgerReplication is enabled externally through MetadataStore, "
+ + "since DISABLE_NODE ZNode is deleted");
+ final List<BookkeeperInternalCallbacks.GenericCallback<Void>> callbackList;
+ synchronized (replicationEnabledCallbacks) {
+ callbackList = new ArrayList<>(replicationEnabledCallbacks);
+ replicationEnabledCallbacks.clear();
+ }
+ for (BookkeeperInternalCallbacks.GenericCallback<Void> callback : callbackList) {
+ try {
+ callback.operationComplete(0, null);
+ } catch (Exception e) {
+ log.warn("replicationEnabledCallbacks handle error", e);
}
}
}
@@ -678,10 +699,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
log.debug("disableLedegerReplication()");
}
try {
- String path = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
- store.put(path, "".getBytes(UTF_8), Optional.of(-1L)).get();
+ store.put(replicationDisablePath, "".getBytes(UTF_8), Optional.of(-1L))
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Auto ledger re-replication is disabled!");
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Exception while stopping auto ledger re-replication", ee);
throw new ReplicationException.UnavailableException(
"Exception while stopping auto ledger re-replication", ee);
@@ -699,9 +720,10 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
log.debug("enableLedegerReplication()");
}
try {
- store.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, Optional.empty()).get();
+ store.delete(replicationDisablePath, Optional.empty())
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
log.info("Resuming automatic ledger re-replication");
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Exception while resuming ledger replication", ee);
throw new ReplicationException.UnavailableException(
"Exception while resuming auto ledger re-replication", ee);
@@ -719,8 +741,9 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
log.debug("isLedgerReplicationEnabled()");
}
try {
- return !store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get();
- } catch (ExecutionException ee) {
+ return !store.exists(replicationDisablePath)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of "
+ "ledger re-replication", ee);
throw new ReplicationException.UnavailableException(
@@ -738,19 +761,18 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
if (log.isDebugEnabled()) {
log.debug("notifyLedgerReplicationEnabled()");
}
-
- synchronized (this) {
- replicationEnabledListener = cb;
+ synchronized (replicationEnabledCallbacks) {
+ replicationEnabledCallbacks.add(cb);
}
-
try {
- if (!store.exists(basePath + '/' + BookKeeperConstants.DISABLE_NODE).get()) {
+ if (!store.exists(replicationDisablePath)
+ .get(BLOCKING_CALL_TIMEOUT, MILLISECONDS)) {
log.info("LedgerReplication is enabled externally through metadata store, "
+ "since DISABLE_NODE node is deleted");
cb.operationComplete(0, null);
return;
}
- } catch (ExecutionException ee) {
+ } catch (ExecutionException | TimeoutException ee) {
log.error("Error while checking the state of "
+ "ledger re-replication", ee);
throw new ReplicationException.UnavailableException(
@@ -833,8 +855,8 @@ public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicati
public void notifyLostBookieRecoveryDelayChanged(BookkeeperInternalCallbacks.GenericCallback<Void> cb) throws
ReplicationException.UnavailableException {
log.debug("notifyLostBookieRecoveryDelayChanged()");
- synchronized (this) {
- lostBookieRecoveryDelayListener = cb;
+ synchronized (lostBookieRecoveryDelayCallbacks) {
+ lostBookieRecoveryDelayCallbacks.add(cb);
}
try {
if (!store.exists(lostBookieRecoveryDelayPath).get()) {
diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
new file mode 100644
index 00000000000..e9db681691f
--- /dev/null
+++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.replication;
+
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertNotNull;
+import static org.testng.AssertJUnit.assertNotSame;
+import static org.testng.AssertJUnit.assertTrue;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper.DigestType;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests publishing of under replicated ledgers by the Auditor bookie node when
+ * corresponding bookies identifes as not running.
+ */
+public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
+
+ // Depending on the taste, select the amount of logging
+ // by decommenting one of the two lines below
+ // private static final Logger LOG = Logger.getRootLogger();
+ private static final Logger LOG = LoggerFactory
+ .getLogger(AuditorLedgerCheckerTest.class);
+
+ private static final byte[] ledgerPassword = "aaa".getBytes();
+ private Random rng; // Random Number Generator
+
+ private DigestType digestType;
+
+ private String underreplicatedPath;
+ private Map<String, AuditorElector> auditorElectors = new ConcurrentHashMap<>();
+ private ZkLedgerUnderreplicationManager urLedgerMgr;
+ private Set<Long> urLedgerList;
+ private String electionPath;
+
+ private List<Long> ledgerList;
+
+ public AuditorLedgerCheckerTest()
+ throws IOException, KeeperException, InterruptedException,
+ CompatibilityException {
+ this("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
+ }
+
+ AuditorLedgerCheckerTest(String ledgerManagerFactoryClass)
+ throws IOException, KeeperException, InterruptedException,
+ CompatibilityException {
+ super(3);
+ LOG.info("Running test case using ledger manager : "
+ + ledgerManagerFactoryClass);
+ this.digestType = DigestType.CRC32;
+ // set ledger manager name
+ baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
+ baseClientConf
+ .setLedgerManagerFactoryClassName(ledgerManagerFactoryClass);
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ super.setUp();
+ underreplicatedPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf)
+ + "/underreplication/ledgers";
+ electionPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf)
+ + "/underreplication/auditorelection";
+
+ urLedgerMgr = new ZkLedgerUnderreplicationManager(baseClientConf, zkc);
+ urLedgerMgr.setCheckAllLedgersCTime(System.currentTimeMillis());
+ startAuditorElectors();
+ rng = new Random(System.currentTimeMillis()); // Initialize the Random
+ urLedgerList = new HashSet<Long>();
+ ledgerList = new ArrayList<Long>(2);
+ baseClientConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ stopAuditorElectors();
+ super.tearDown();
+ }
+
+ private void startAuditorElectors() throws Exception {
+ for (String addr : bookieAddresses().stream().map(Object::toString)
+ .collect(Collectors.toList())) {
+ AuditorElector auditorElector = new AuditorElector(addr, baseConf);
+ auditorElectors.put(addr, auditorElector);
+ auditorElector.start();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting Auditor Elector");
+ }
+ }
+ }
+
+ private void stopAuditorElectors() throws Exception {
+ for (AuditorElector auditorElector : auditorElectors.values()) {
+ auditorElector.shutdown();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stopping Auditor Elector!");
+ }
+ }
+ }
+
+ @Test
+ public void testTriggerAuditorBySettingDelayToZeroWithPendingAuditTask() throws Exception {
+ // wait for a second so that the initial periodic check finishes
+ Thread.sleep(1000);
+
+ Auditor auditorBookiesAuditor = getAuditorBookiesAuditor();
+ LedgerHandle lh1 = createAndAddEntriesToLedger();
+ Long ledgerId = lh1.getId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created ledger : " + ledgerId);
+ }
+ ledgerList.add(ledgerId);
+ lh1.close();
+
+ final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
+ .size());
+
+ int lostBookieRecoveryDelay = 5;
+ // wait for 5 seconds before starting the recovery work when a bookie fails
+ urLedgerMgr.setLostBookieRecoveryDelay(lostBookieRecoveryDelay);
+
+ // shutdown a non auditor bookie; choosing non-auditor to avoid another election
+ String shutdownBookie = shutDownNonAuditorBookie();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting for ledgers to be marked as under replicated");
+ }
+ assertFalse("audit of lost bookie isn't delayed", underReplicaLatch.await(2, TimeUnit.SECONDS));
+ assertEquals("under replicated ledgers identified when it was not expected", 0,
+ urLedgerList.size());
+
+ Future<?> auditTask = auditorBookiesAuditor.getAuditTask();
+ assertNotSame("auditTask is not supposed to be null", null, auditTask);
+ assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to what we set",
+ lostBookieRecoveryDelay, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+
+ // set lostBookieRecoveryDelay to 0, so that Auditor is triggered immediately
+ urLedgerMgr.setLostBookieRecoveryDelay(0);
+ assertTrue("audit of lost bookie shouldn't be delayed", underReplicaLatch.await(1, TimeUnit.SECONDS));
+ assertEquals("all under replicated ledgers should be identified", ledgerList.size(),
+ urLedgerList.size());
+
+ Thread.sleep(100);
+ auditTask = auditorBookiesAuditor.getAuditTask();
+ assertEquals("auditTask is supposed to be null", null, auditTask);
+ assertEquals(
+ "lostBookieRecoveryDelayBeforeChange of Auditor should be equal to previously set value",
+ 0, auditorBookiesAuditor.getLostBookieRecoveryDelayBeforeChange());
+ }
+
+ private CountDownLatch registerUrLedgerWatcher(int count)
+ throws KeeperException, InterruptedException {
+ final CountDownLatch underReplicaLatch = new CountDownLatch(count);
+ for (Long ledgerId : ledgerList) {
+ Watcher urLedgerWatcher = new ChildWatcher(underReplicaLatch);
+ String znode = ZkLedgerUnderreplicationManager.getUrLedgerZnode(underreplicatedPath,
+ ledgerId);
+ zkc.exists(znode, urLedgerWatcher);
+ }
+ return underReplicaLatch;
+ }
+
+ private String shutdownBookie(int bkShutdownIndex) throws Exception {
+ BookieServer bkServer = serverByIndex(bkShutdownIndex);
+ String bookieAddr = bkServer.getBookieId().toString();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Shutting down bookie:" + bookieAddr);
+ }
+ killBookie(bkShutdownIndex);
+ auditorElectors.get(bookieAddr).shutdown();
+ auditorElectors.remove(bookieAddr);
+ return bookieAddr;
+ }
+
+ private LedgerHandle createAndAddEntriesToLedger() throws BKException,
+ InterruptedException {
+ int numEntriesToWrite = 100;
+ // Create a ledger
+ LedgerHandle lh = bkc.createLedger(digestType, ledgerPassword);
+ LOG.info("Ledger ID: " + lh.getId());
+ addEntry(numEntriesToWrite, lh);
+ return lh;
+ }
+
+ private void addEntry(int numEntriesToWrite, LedgerHandle lh)
+ throws InterruptedException, BKException {
+ final CountDownLatch completeLatch = new CountDownLatch(numEntriesToWrite);
+ final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
+
+ for (int i = 0; i < numEntriesToWrite; i++) {
+ ByteBuffer entry = ByteBuffer.allocate(4);
+ entry.putInt(rng.nextInt(Integer.MAX_VALUE));
+ entry.position(0);
+ lh.asyncAddEntry(entry.array(), new AddCallback() {
+ public void addComplete(int rc2, LedgerHandle lh, long entryId, Object ctx) {
+ rc.compareAndSet(BKException.Code.OK, rc2);
+ completeLatch.countDown();
+ }
+ }, null);
+ }
+ completeLatch.await();
+ if (rc.get() != BKException.Code.OK) {
+ throw BKException.create(rc.get());
+ }
+
+ }
+
+ private class ChildWatcher implements Watcher {
+ private final CountDownLatch underReplicaLatch;
+
+ public ChildWatcher(CountDownLatch underReplicaLatch) {
+ this.underReplicaLatch = underReplicaLatch;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Received notification for the ledger path : "
+ + event.getPath());
+ for (Long ledgerId : ledgerList) {
+ if (event.getPath().contains(ledgerId + "")) {
+ urLedgerList.add(ledgerId);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Count down and waiting for next notification");
+ }
+ // count down and waiting for next notification
+ underReplicaLatch.countDown();
+ }
+ }
+
+ private BookieServer getAuditorBookie() throws Exception {
+ List<BookieServer> auditors = new LinkedList<BookieServer>();
+ byte[] data = zkc.getData(electionPath, false, null);
+ assertNotNull("Auditor election failed", data);
+ for (int i = 0; i < bookieCount(); i++) {
+ BookieId bookieId = addressByIndex(i);
+ if (new String(data).contains(bookieId + "")) {
+ auditors.add(serverByIndex(i));
+ }
+ }
+ assertEquals("Multiple Bookies acting as Auditor!", 1, auditors
+ .size());
+ return auditors.get(0);
+ }
+
+ private Auditor getAuditorBookiesAuditor() throws Exception {
+ BookieServer auditorBookieServer = getAuditorBookie();
+ String bookieAddr = auditorBookieServer.getBookieId().toString();
+ return auditorElectors.get(bookieAddr).auditor;
+ }
+
+ private String shutDownNonAuditorBookie() throws Exception {
+ // shutdown bookie which is not an auditor
+ int indexOf = indexOfServer(getAuditorBookie());
+ int bkIndexDownBookie;
+ if (indexOf < lastBookieIndex()) {
+ bkIndexDownBookie = indexOf + 1;
+ } else {
+ bkIndexDownBookie = indexOf - 1;
+ }
+ return shutdownBookie(bkIndexDownBookie);
+ }
+}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
index 1c8b62642da..115634638ce 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import lombok.Cleanup;
@@ -613,6 +614,8 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
final String missingReplica = "localhost:3181";
// disabling replication
+ AtomicInteger callbackCount = new AtomicInteger();
+ lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
lum.disableLedgerReplication();
log.info("Disabled Ledeger Replication");
@@ -630,6 +633,7 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
} catch (TimeoutException te) {
// expected behaviour, as the replication is disabled
}
+ assertEquals(callbackCount.get(), 1, "Notify callback times mismatch");
}
/**
@@ -650,7 +654,8 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
log.debug("Unexpected exception while marking urLedger", e);
fail("Unexpected exception while marking urLedger" + e.getMessage());
}
-
+ AtomicInteger callbackCount = new AtomicInteger();
+ lum.notifyLedgerReplicationEnabled((rc, result) -> callbackCount.incrementAndGet());
// disabling replication
lum.disableLedgerReplication();
log.debug("Disabled Ledeger Replication");
@@ -687,6 +692,7 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
znodeLatch.await(5, TimeUnit.SECONDS);
log.debug("Enabled Ledeger Replication");
assertEquals(znodeLatch.getCount(), 0, "Failed to disable ledger replication!");
+ assertEquals(callbackCount.get(), 2, "Notify callback times mismatch");
} finally {
thread1.interrupt();
}
@@ -748,6 +754,17 @@ public class LedgerUnderreplicationManagerTest extends BaseMetadataStoreTest {
assertEquals(underReplicaMgr1.getReplicasCheckCTime(), curTime);
}
+ @Test(timeOut = 60000, dataProvider = "impl")
+ public void testLostBookieRecoveryDelay(String provider, Supplier<String> urlSupplier) throws Exception {
+ methodSetup(urlSupplier);
+
+ AtomicInteger callbackCount = new AtomicInteger();
+ lum.notifyLostBookieRecoveryDelayChanged((rc, result) -> callbackCount.incrementAndGet());
+ // disabling replication
+ lum.setLostBookieRecoveryDelay(10);
+ Awaitility.await().until(() -> callbackCount.get() == 2);
+ }
+
private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica) throws Exception {
Long ledgerA = 0xfeadeefdacL;
String znodeA = getUrLedgerZnode(ledgerA);