You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/07/12 14:57:25 UTC
[bookkeeper] branch master updated: Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c3706e9c25 Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)
c3706e9c25 is described below
commit c3706e9c2508ba9042afc0e3c19a92c30bc2b32d
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Jul 12 07:57:19 2022 -0700
Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 3 +-
.../apache/bookkeeper/client/BookKeeperAdmin.java | 2 +-
.../bookkeeper/meta/FlatLedgerManagerFactory.java | 4 +-
.../bookkeeper/meta/LedgerManagerFactory.java | 3 +-
.../LegacyHierarchicalLedgerManagerFactory.java | 4 +-
.../bookkeeper/meta/MSLedgerManagerFactory.java | 3 +-
.../meta/ZkLedgerUnderreplicationManager.java | 68 +++++++++---------
.../org/apache/bookkeeper/replication/Auditor.java | 50 +++++++++++--
.../replication/ReplicationException.java | 25 +++++++
.../bookkeeper/replication/ReplicationWorker.java | 14 ++--
.../autorecovery/ListUnderReplicatedCommand.java | 3 +-
.../QueryAutoRecoveryStatusCommand.java | 3 +-
.../cli/commands/autorecovery/ToggleCommand.java | 3 +-
.../bookie/ForceAuditorChecksCmdTest.java | 5 +-
.../replication/AuditorPeriodicCheckTest.java | 3 +-
.../replication/TestReplicationWorker.java | 82 ++++++++++++++++++----
.../metadata/etcd/EtcdLedgerManagerFactory.java | 3 +-
.../ListUnderReplicatedCommandTest.java | 8 +--
18 files changed, 201 insertions(+), 85 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index a7e15f67b6..47536c5c33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -106,7 +106,6 @@ import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1897,7 +1896,7 @@ public class BookieShell implements Tool {
underreplicationManager.setReplicasCheckCTime(time);
}
}
- } catch (InterruptedException | KeeperException | ReplicationException e) {
+ } catch (InterruptedException | ReplicationException e) {
LOG.error("Exception while trying to reset last run time ", e);
return -1;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 54cb566a30..279e745101 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1471,7 +1471,7 @@ public class BookKeeperAdmin implements AutoCloseable {
}
private LedgerUnderreplicationManager getUnderreplicationManager()
- throws CompatibilityException, KeeperException, InterruptedException {
+ throws CompatibilityException, UnavailableException, InterruptedException {
if (underreplicationManager == null) {
underreplicationManager = mFactory.newLedgerUnderreplicationManager();
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index e613082d64..95269d3c58 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
/**
@@ -86,7 +85,8 @@ public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+ throws ReplicationException.UnavailableException, InterruptedException,
+ ReplicationException.CompatibilityException {
return new ZkLedgerUnderreplicationManager(conf, zk);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index d213235d6e..3d2355f87a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -84,7 +84,8 @@ public interface LedgerManagerFactory extends AutoCloseable {
* @see LedgerUnderreplicationManager
*/
LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
+ throws ReplicationException.UnavailableException,
+ InterruptedException, ReplicationException.CompatibilityException;
/**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index a218ef3eb9..03f8828134 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
/**
@@ -93,7 +92,8 @@ public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerMana
@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+ throws ReplicationException.UnavailableException, InterruptedException,
+ ReplicationException.CompatibilityException{
return new ZkLedgerUnderreplicationManager(conf, zk);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 3ad303aa28..e8dfdb08f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -671,7 +671,8 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
}
@Override
- public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
+ public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+ throws ReplicationException.UnavailableException,
InterruptedException, ReplicationException.CompatibilityException {
// TODO: currently just use zk ledger underreplication manager
return new ZkLedgerUnderreplicationManager(conf, zk);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 9dcc81c162..097b877ac0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -128,7 +128,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
private final SubTreeCache subTreeCache;
public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
- throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+ throws UnavailableException, InterruptedException, ReplicationException.CompatibilityException {
this.conf = conf;
rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
basePath = getBasePath(rootPath);
@@ -149,7 +149,11 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
});
- checkLayout();
+ try {
+ checkLayout();
+ } catch (KeeperException ke) {
+ throw ReplicationException.fromKeeperException("", ke);
+ }
}
public static String getBasePath(String rootPath) {
@@ -284,7 +288,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
underreplicatedLedger.setReplicaList(replicaList);
return underreplicatedLedger;
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -424,7 +428,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// znode in place, so the ledger is checked.
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger znode", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -578,7 +582,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
try {
return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -608,7 +612,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// nothing found, wait for a watcher to trigger
changedLatch.await();
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -639,7 +643,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -660,7 +664,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -684,8 +688,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
"AutoRecovery is already disabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while stopping auto ledger re-replication", ke);
- throw new ReplicationException.UnavailableException(
- "Exception while stopping auto ledger re-replication", ke);
+ throw ReplicationException.fromKeeperException("Exception while stopping auto ledger re-replication", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
@@ -708,8 +711,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
"AutoRecovery is already enabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while resuming ledger replication", ke);
- throw new ReplicationException.UnavailableException(
- "Exception while resuming auto ledger re-replication", ke);
+ throw ReplicationException.fromKeeperException("Exception while resuming auto ledger re-replication", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
@@ -729,8 +731,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
- throw new ReplicationException.UnavailableException(
- "Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
@@ -765,8 +766,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
- throw new ReplicationException.UnavailableException(
- "Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException(
@@ -791,10 +791,14 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
*/
public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath,
long ledgerId, List<ACL> zkAcls)
- throws KeeperException, InterruptedException {
- final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
- ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
- return lockPath;
+ throws UnavailableException, InterruptedException {
+ try {
+ final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
+ ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
+ return lockPath;
+ } catch (KeeperException ke) {
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
+ }
}
@Override
@@ -824,7 +828,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
return false;
} catch (KeeperException ke) {
LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -845,7 +849,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
} catch (KeeperException ke) {
LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -860,7 +864,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
return Integer.parseInt(new String(data, UTF_8));
} catch (KeeperException ke) {
LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -882,7 +886,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of underReplicated ledgers", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -907,7 +911,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
} catch (KeeperException ke) {
LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke);
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -928,7 +932,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
// this is ok.
} catch (KeeperException e) {
LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e);
- throw new ReplicationException.UnavailableException(
+ throw ReplicationException.fromKeeperException(
"Error while getting ReplicationWorkerId rereplicating Ledger", e);
} catch (InterruptedException e) {
LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e);
@@ -957,7 +961,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
zkc.create(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
}
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -978,7 +982,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
LOG.warn("checkAllLedgersCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1004,7 +1008,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
CreateMode.PERSISTENT);
}
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1025,7 +1029,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1050,7 +1054,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
LOG.debug("setReplicasCheckCTime completed successfully");
}
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1070,7 +1074,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
LOG.warn("replicasCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+ throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index bad82eb17d..bccdb1d1df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -116,7 +116,6 @@ import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -527,9 +526,6 @@ public class Auditor implements AutoCloseable {
} catch (CompatibilityException ce) {
throw new UnavailableException(
"CompatibilityException while initializing Auditor", ce);
- } catch (KeeperException ioe) {
- throw new UnavailableException(
- "Exception while initializing Auditor", ioe);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new UnavailableException(
@@ -694,6 +690,9 @@ public class Auditor implements AutoCloseable {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (UnavailableException ue) {
LOG.error("Exception while reading from ZK", ue);
} finally {
@@ -771,6 +770,10 @@ public class Auditor implements AutoCloseable {
long initialDelay;
try {
checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return;
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
checkAllLedgersLastExecutedCTime = -1;
@@ -810,8 +813,6 @@ public class Auditor implements AutoCloseable {
LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
checkSuccess = true;
- } catch (KeeperException ke) {
- LOG.error("Exception while running periodic check", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while running periodic check", ie);
@@ -819,6 +820,9 @@ public class Auditor implements AutoCloseable {
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
LOG.error("I/O exception running periodic check", ioe);
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (ReplicationException.UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
} finally {
@@ -846,6 +850,10 @@ public class Auditor implements AutoCloseable {
long initialDelay;
try {
placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return;
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue);
placementPolicyCheckLastExecutedCTime = -1;
@@ -969,6 +977,10 @@ public class Auditor implements AutoCloseable {
long initialDelay;
try {
replicasCheckLastExecutedCTime = ledgerUnderreplicationManager.getReplicasCheckCTime();
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return;
} catch (UnavailableException ue) {
LOG.error("Got UnavailableException while trying to get replicasCheckCTime", ue);
replicasCheckLastExecutedCTime = -1;
@@ -1070,6 +1082,9 @@ public class Auditor implements AutoCloseable {
try {
Auditor.this.ledgerUnderreplicationManager
.notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (UnavailableException ae) {
LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae);
}
@@ -1134,6 +1149,10 @@ public class Auditor implements AutoCloseable {
throws BKAuditException, InterruptedException, BKException {
try {
waitIfLedgerReplicationDisabled();
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return;
} catch (UnavailableException ue) {
LOG.error("Underreplication unavailable, skipping audit."
+ "Will retry after a period");
@@ -1294,7 +1313,7 @@ public class Auditor implements AutoCloseable {
* List all the ledgers and check them individually. This should not
* be run very often.
*/
- void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
+ void checkAllLedgers() throws BKException, IOException, InterruptedException {
final BookKeeper localClient = getBookKeeper(conf);
final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
try {
@@ -1309,6 +1328,10 @@ public class Auditor implements AutoCloseable {
FutureUtils.complete(processFuture, null);
return;
}
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return;
} catch (UnavailableException ue) {
LOG.error("Underreplication manager unavailable running periodic check", ue);
FutureUtils.complete(processFuture, null);
@@ -1366,6 +1389,9 @@ public class Auditor implements AutoCloseable {
FutureUtils.result(processFuture, BKException.HANDLER);
try {
ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set checkAllLedgersCTime", ue);
}
@@ -1489,6 +1515,9 @@ public class Auditor implements AutoCloseable {
}
try {
ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue);
}
@@ -1952,6 +1981,9 @@ public class Auditor implements AutoCloseable {
}
try {
ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
} catch (UnavailableException ue) {
LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
}
@@ -2053,6 +2085,10 @@ public class Auditor implements AutoCloseable {
ledgerInRange);
mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
return true;
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("Non Recoverable Exception while reading from ZK", nre);
+ submitShutdownTask();
+ return true;
} catch (UnavailableException une) {
LOG.error("Got exception while trying to check if ledger: {} is underreplicated", ledgerInRange, une);
mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 733f63bde8..34479fbf11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -19,12 +19,22 @@
package org.apache.bookkeeper.replication;
import java.util.function.Function;
+import org.apache.zookeeper.KeeperException;
/**
* Exceptions for use within the replication service.
*/
public abstract class ReplicationException extends Exception {
+ public static UnavailableException fromKeeperException(String message, KeeperException ke) {
+ if (ke instanceof KeeperException.ConnectionLossException
+ || ke instanceof KeeperException.SessionExpiredException) {
+ return new NonRecoverableReplicationException(message, ke);
+ }
+ return new UnavailableException(message, ke);
+ }
+
+
public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> {
if (cause instanceof ReplicationException) {
return (ReplicationException) cause;
@@ -56,6 +66,21 @@ public abstract class ReplicationException extends Exception {
}
}
+ /**
+ * The replication service encountered an error that requires service restart.
+ */
+ public static class NonRecoverableReplicationException extends UnavailableException {
+ private static final long serialVersionUID = 31872211L;
+
+ public NonRecoverableReplicationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NonRecoverableReplicationException(String message) {
+ super(message);
+ }
+ }
+
/**
* Compatibility error. This version of the code, doesn't know how to
* deal with the metadata it has found.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 721380a656..c4a0e430d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -69,7 +69,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,7 +141,7 @@ public class ReplicationWorker implements Runnable {
* - configurations
*/
public ReplicationWorker(final ServerConfiguration conf)
- throws CompatibilityException, KeeperException,
+ throws CompatibilityException, UnavailableException,
InterruptedException, IOException {
this(conf, NullStatsLogger.INSTANCE);
}
@@ -159,8 +158,7 @@ public class ReplicationWorker implements Runnable {
*/
public ReplicationWorker(final ServerConfiguration conf,
StatsLogger statsLogger)
- throws CompatibilityException, KeeperException,
-
+ throws CompatibilityException, UnavailableException,
InterruptedException, IOException {
this(conf, Auditor.createBookKeeperClient(conf), true, statsLogger);
}
@@ -169,8 +167,7 @@ public class ReplicationWorker implements Runnable {
BookKeeper bkc,
boolean ownBkc,
StatsLogger statsLogger)
- throws CompatibilityException, KeeperException,
- InterruptedException, IOException {
+ throws CompatibilityException, InterruptedException, UnavailableException {
this.conf = conf;
this.bkc = bkc;
this.ownBkc = ownBkc;
@@ -243,6 +240,11 @@ public class ReplicationWorker implements Runnable {
} catch (BKException e) {
LOG.error("BKException while replicating fragments", e);
waitBackOffTime(rwRereplicateBackoffMs);
+ } catch (ReplicationException.NonRecoverableReplicationException nre) {
+ LOG.error("NonRecoverableReplicationException "
+ + "while replicating fragments", nre);
+ shutdown();
+ return;
} catch (UnavailableException e) {
LOG.error("UnavailableException "
+ "while replicating fragments", e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
index 7979d1d1b5..09077cef70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
import org.apache.bookkeeper.util.LedgerIdFormatter;
import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +140,7 @@ public class ListUnderReplicatedCommand extends BookieCommand<ListUnderReplicate
LedgerUnderreplicationManager underreplicationManager;
try {
underreplicationManager = mFactory.newLedgerUnderreplicationManager();
- } catch (KeeperException | ReplicationException.CompatibilityException e) {
+ } catch (ReplicationException e) {
throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
index 0f86a2d2ec..b2b5e01e40 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
@@ -34,7 +34,6 @@ import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,7 +98,7 @@ public class QueryAutoRecoveryStatusCommand
List<LedgerRecoverInfo> ledgerList = new LinkedList<>();
try {
underreplicationManager = mFactory.newLedgerUnderreplicationManager();
- } catch (KeeperException | ReplicationException.CompatibilityException e) {
+ } catch (ReplicationException e) {
throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
index b43e73fc62..99185a8a97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
@@ -32,7 +32,6 @@ import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +112,7 @@ public class ToggleCommand extends BookieCommand<ToggleCommand.AutoRecoveryFlags
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedExecutionException(e);
- } catch (KeeperException | ReplicationException e) {
+ } catch (ReplicationException e) {
throw new UncheckedExecutionException(e);
}
return null;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
index ad96129914..18785a58b4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
import org.junit.Test;
@@ -62,7 +61,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
urM.setCheckAllLedgersCTime(curTime);
urM.setPlacementPolicyCheckCTime(curTime);
urM.setReplicasCheckCTime(curTime);
- } catch (InterruptedException | KeeperException | ReplicationException e) {
+ } catch (InterruptedException | ReplicationException e) {
throw new UncheckedExecutionException(e);
}
return null;
@@ -87,7 +86,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) {
Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old");
}
- } catch (InterruptedException | KeeperException | ReplicationException e) {
+ } catch (InterruptedException | ReplicationException e) {
throw new UncheckedExecutionException(e);
}
return null;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 18ace93533..635ef87ab5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -81,7 +81,6 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
-import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -730,7 +729,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
super(bookieIdentifier, conf, statsLogger);
}
- void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
+ void checkAllLedgers() throws BKException, IOException, InterruptedException {
super.checkAllLedgers();
latchRef.get().countDown();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index bb302350a1..a69b9cbd49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -34,11 +34,10 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import lombok.Cleanup;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.ClientUtil;
@@ -67,6 +66,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -103,13 +103,16 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
}
TestReplicationWorker(String ledgerManagerFactory) {
- super(3);
+ super(3, 300);
LOG.info("Running test case using ledger manager : "
+ ledgerManagerFactory);
// set ledger manager name
baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseConf.setRereplicationEntryBatchSize(3);
+ baseConf.setZkTimeout(7000);
+ baseConf.setZkRetryBackoffMaxMs(500);
+ baseConf.setZkRetryBackoffStartMs(10);
}
@Override
@@ -594,9 +597,11 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
int rw1UnableToReadEntriesForReplication = rw1.unableToReadEntriesForReplication.get(lh.getId()).size();
int rw2UnableToReadEntriesForReplication = rw2.unableToReadEntriesForReplication.get(lh.getId()).size();
assertTrue(
- "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + " in RW2: "
+ "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication
+ + " in RW2: "
+ rw2UnableToReadEntriesForReplication,
- (rw1UnableToReadEntriesForReplication == 0) || (rw2UnableToReadEntriesForReplication == 0));
+ (rw1UnableToReadEntriesForReplication == 0)
+ || (rw2UnableToReadEntriesForReplication == 0));
} finally {
rw1.shutdown();
rw2.shutdown();
@@ -609,7 +614,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
public InjectedReplicationWorker(ServerConfiguration conf, StatsLogger statsLogger,
CopyOnWriteArrayList<Long> delayReplicationPeriods)
- throws CompatibilityException, KeeperException, InterruptedException, IOException {
+ throws CompatibilityException, ReplicationException.UnavailableException,
+ InterruptedException, IOException {
super(conf, statsLogger);
this.delayReplicationPeriods = delayReplicationPeriods;
}
@@ -829,18 +835,63 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
assertTrue("Replication worker should be running", rw.isRunning());
stopZKCluster();
- // Wait for disconnection to be picked up
+ // ZK is down for shorter period than reconnect timeout
+ Thread.sleep(1000);
+ startZKCluster();
+
+ assertTrue("Replication worker should not shutdown", rw.isRunning());
+ }
+ }
+
+ /**
+ * Test that the replication worker shuts down on non-recoverable ZK connection loss.
+ */
+ @Test
+ public void testRWZKConnectionLostOnNonRecoverableZkError() throws Exception {
+ for (int j = 0; j < 3; j++) {
+ LedgerHandle lh = bkc.createLedger(1, 1, 1,
+ BookKeeper.DigestType.CRC32, TESTPASSWD,
+ null);
+ final long createdLedgerId = lh.getId();
for (int i = 0; i < 10; i++) {
- if (!zk.getState().isConnected()) {
- break;
- }
- Thread.sleep(1000);
+ lh.addEntry(data);
}
- assertFalse(zk.getState().isConnected());
- startZKCluster();
+ lh.close();
+ }
+
+ killBookie(2);
+ killBookie(1);
+ startNewBookie();
+ startNewBookie();
+
+ servers.get(0).getConfiguration().setRwRereplicateBackoffMs(100);
+ servers.get(0).startAutoRecovery();
+
+ Auditor auditor = getAuditor(10, TimeUnit.SECONDS);
+ ReplicationWorker rw = servers.get(0).getReplicationWorker();
+
+ ZkLedgerUnderreplicationManager ledgerUnderreplicationManager =
+ (ZkLedgerUnderreplicationManager) FieldUtils.readField(auditor,
+ "ledgerUnderreplicationManager", true);
- assertTrue("Replication worker should still be running", rw.isRunning());
+ ZooKeeper zkc = (ZooKeeper) FieldUtils.readField(ledgerUnderreplicationManager, "zkc", true);
+ auditor.submitAuditTask().get();
+
+ assertTrue(zkc.getState().isConnected());
+ zkc.close();
+ assertFalse(zkc.getState().isConnected());
+
+ auditor.submitAuditTask();
+ rw.run();
+
+ for (int i = 0; i < 10; i++) {
+ if (!rw.isRunning() && !auditor.isRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
}
+ assertFalse("Replication worker should NOT be running", rw.isRunning());
+ assertFalse("Auditor should NOT be running", auditor.isRunning());
}
private void killAllBookies(LedgerHandle lh, BookieId excludeBK)
@@ -975,7 +1026,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
*/
BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper);
long ledgerId = 567L;
- LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD,
+ LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2,
+ BookKeeper.DigestType.CRC32, TESTPASSWD,
null);
for (int i = 0; i < 10; i++) {
lh.addEntry(i, data);
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
index 4d83f73bf0..11eef381c1 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
@@ -88,7 +89,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory {
@Override
public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
- throws KeeperException, InterruptedException, CompatibilityException {
+ throws ReplicationException.UnavailableException, InterruptedException, CompatibilityException {
throw new UnsupportedOperationException();
}
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
index b4d86828a7..b0453980bc 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
@@ -72,7 +72,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
@Test
public void testWithoutArgs()
- throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+ throws InterruptedException, ReplicationException {
testCommand("");
verify(factory, times(1)).newLedgerUnderreplicationManager();
verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -82,7 +82,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
@Test
public void testMissingReplica()
- throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+ throws InterruptedException, ReplicationException {
testCommand("-mr", "");
verify(factory, times(1)).newLedgerUnderreplicationManager();
verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -92,7 +92,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
@Test
public void testExcludingMissingReplica()
- throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+ throws InterruptedException, ReplicationException {
testCommand("-emr", "");
verify(factory, times(1)).newLedgerUnderreplicationManager();
verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -102,7 +102,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
@Test
public void testPrintMissingReplica()
- throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+ throws InterruptedException, ReplicationException {
ArrayList<String> list = new ArrayList<>();
list.add("replica");