You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2022/07/12 14:57:25 UTC

[bookkeeper] branch master updated: Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c3706e9c25 Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)
c3706e9c25 is described below

commit c3706e9c2508ba9042afc0e3c19a92c30bc2b32d
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Jul 12 07:57:19 2022 -0700

    Shut down ReplicationWorker and Auditor on non-recoverable ZK error (#3374)
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  3 +-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  2 +-
 .../bookkeeper/meta/FlatLedgerManagerFactory.java  |  4 +-
 .../bookkeeper/meta/LedgerManagerFactory.java      |  3 +-
 .../LegacyHierarchicalLedgerManagerFactory.java    |  4 +-
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |  3 +-
 .../meta/ZkLedgerUnderreplicationManager.java      | 68 +++++++++---------
 .../org/apache/bookkeeper/replication/Auditor.java | 50 +++++++++++--
 .../replication/ReplicationException.java          | 25 +++++++
 .../bookkeeper/replication/ReplicationWorker.java  | 14 ++--
 .../autorecovery/ListUnderReplicatedCommand.java   |  3 +-
 .../QueryAutoRecoveryStatusCommand.java            |  3 +-
 .../cli/commands/autorecovery/ToggleCommand.java   |  3 +-
 .../bookie/ForceAuditorChecksCmdTest.java          |  5 +-
 .../replication/AuditorPeriodicCheckTest.java      |  3 +-
 .../replication/TestReplicationWorker.java         | 82 ++++++++++++++++++----
 .../metadata/etcd/EtcdLedgerManagerFactory.java    |  3 +-
 .../ListUnderReplicatedCommandTest.java            |  8 +--
 18 files changed, 201 insertions(+), 85 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index a7e15f67b6..47536c5c33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -106,7 +106,6 @@ import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1897,7 +1896,7 @@ public class BookieShell implements Tool {
                                 underreplicationManager.setReplicasCheckCTime(time);
                             }
                         }
-                    } catch (InterruptedException | KeeperException | ReplicationException e) {
+                    } catch (InterruptedException | ReplicationException e) {
                         LOG.error("Exception while trying to reset last run time ", e);
                         return -1;
                     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 54cb566a30..279e745101 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1471,7 +1471,7 @@ public class BookKeeperAdmin implements AutoCloseable {
     }
 
     private LedgerUnderreplicationManager getUnderreplicationManager()
-            throws CompatibilityException, KeeperException, InterruptedException {
+            throws CompatibilityException, UnavailableException, InterruptedException {
         if (underreplicationManager == null) {
             underreplicationManager = mFactory.newLedgerUnderreplicationManager();
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index e613082d64..95269d3c58 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
 
 /**
@@ -86,7 +85,8 @@ public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
 
     @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+            throws ReplicationException.UnavailableException, InterruptedException,
+            ReplicationException.CompatibilityException {
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index d213235d6e..3d2355f87a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -84,7 +84,8 @@ public interface LedgerManagerFactory extends AutoCloseable {
      * @see LedgerUnderreplicationManager
      */
     LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
+            throws ReplicationException.UnavailableException,
+            InterruptedException, ReplicationException.CompatibilityException;
 
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index a218ef3eb9..03f8828134 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -28,7 +28,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
 
 /**
@@ -93,7 +92,8 @@ public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerMana
 
     @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException{
+            throws ReplicationException.UnavailableException, InterruptedException,
+            ReplicationException.CompatibilityException{
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index 3ad303aa28..e8dfdb08f3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -671,7 +671,8 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
     }
 
     @Override
-    public LedgerUnderreplicationManager newLedgerUnderreplicationManager() throws KeeperException,
+    public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
+            throws ReplicationException.UnavailableException,
             InterruptedException, ReplicationException.CompatibilityException {
         // TODO: currently just use zk ledger underreplication manager
         return new ZkLedgerUnderreplicationManager(conf, zk);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 9dcc81c162..097b877ac0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -128,7 +128,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     private final SubTreeCache subTreeCache;
 
     public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
-            throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
+            throws UnavailableException, InterruptedException, ReplicationException.CompatibilityException {
         this.conf = conf;
         rootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
         basePath = getBasePath(rootPath);
@@ -149,7 +149,11 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             }
         });
 
-        checkLayout();
+        try {
+            checkLayout();
+        } catch (KeeperException ke) {
+            throw ReplicationException.fromKeeperException("", ke);
+        }
     }
 
     public static String getBasePath(String rootPath) {
@@ -284,7 +288,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             underreplicatedLedger.setReplicaList(replicaList);
             return underreplicatedLedger;
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -424,7 +428,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             // znode in place, so the ledger is checked.
         } catch (KeeperException ke) {
             LOG.error("Error deleting underreplicated ledger znode", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -578,7 +582,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         try {
             return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -608,7 +612,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                 // nothing found, wait for a watcher to trigger
                 changedLatch.await();
             } catch (KeeperException ke) {
-                throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+                throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
             } catch (InterruptedException ie) {
                 Thread.currentThread().interrupt();
                 throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -639,7 +643,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             // this is ok
         } catch (KeeperException ke) {
             LOG.error("Error deleting underreplicated ledger lock", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -660,7 +664,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             // this is ok
         } catch (KeeperException ke) {
             LOG.error("Error deleting underreplicated ledger lock", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
@@ -684,8 +688,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                     "AutoRecovery is already disabled!", ke);
         } catch (KeeperException ke) {
             LOG.error("Exception while stopping auto ledger re-replication", ke);
-            throw new ReplicationException.UnavailableException(
-                    "Exception while stopping auto ledger re-replication", ke);
+            throw ReplicationException.fromKeeperException("Exception while stopping auto ledger re-replication", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException(
@@ -708,8 +711,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                     "AutoRecovery is already enabled!", ke);
         } catch (KeeperException ke) {
             LOG.error("Exception while resuming ledger replication", ke);
-            throw new ReplicationException.UnavailableException(
-                    "Exception while resuming auto ledger re-replication", ke);
+            throw ReplicationException.fromKeeperException("Exception while resuming auto ledger re-replication", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException(
@@ -729,8 +731,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         } catch (KeeperException ke) {
             LOG.error("Error while checking the state of "
                     + "ledger re-replication", ke);
-            throw new ReplicationException.UnavailableException(
-                    "Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException(
@@ -765,8 +766,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
         } catch (KeeperException ke) {
             LOG.error("Error while checking the state of "
                     + "ledger re-replication", ke);
-            throw new ReplicationException.UnavailableException(
-                    "Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException(
@@ -791,10 +791,14 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
      */
     public static String acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath,
         long ledgerId, List<ACL> zkAcls)
-            throws KeeperException, InterruptedException {
-        final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
-        ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
-        return lockPath;
+            throws UnavailableException, InterruptedException {
+        try {
+            final String lockPath = getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId);
+            ZkUtils.createFullPathOptimistic(zkc, lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
+            return lockPath;
+        } catch (KeeperException ke) {
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
+        }
     }
 
     @Override
@@ -824,7 +828,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             return false;
         } catch (KeeperException ke) {
             LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -845,7 +849,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             }
         } catch (KeeperException ke) {
             LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -860,7 +864,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             return Integer.parseInt(new String(data, UTF_8));
         } catch (KeeperException ke) {
             LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -882,7 +886,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             zkc.addWatch(urLedgerPath, w, AddWatchMode.PERSISTENT_RECURSIVE);
         } catch (KeeperException ke) {
             LOG.error("Error while checking the state of underReplicated ledgers", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -907,7 +911,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             }
         } catch (KeeperException ke) {
             LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke);
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -928,7 +932,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             // this is ok.
         } catch (KeeperException e) {
             LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e);
-            throw new ReplicationException.UnavailableException(
+            throw ReplicationException.fromKeeperException(
                     "Error while getting ReplicationWorkerId rereplicating Ledger", e);
         } catch (InterruptedException e) {
             LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e);
@@ -957,7 +961,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                 zkc.create(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
             }
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -978,7 +982,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             LOG.warn("checkAllLedgersCtimeZnode is not yet available");
             return -1;
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1004,7 +1008,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                         CreateMode.PERSISTENT);
             }
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1025,7 +1029,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
             return -1;
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1050,7 +1054,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
                 LOG.debug("setReplicasCheckCTime completed successfully");
             }
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
@@ -1070,7 +1074,7 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
             LOG.warn("replicasCheckCtimeZnode is not yet available");
             return -1;
         } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
+            throw ReplicationException.fromKeeperException("Error contacting zookeeper", ke);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index bad82eb17d..bccdb1d1df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -116,7 +116,6 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -527,9 +526,6 @@ public class Auditor implements AutoCloseable {
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
-        } catch (KeeperException ioe) {
-            throw new UnavailableException(
-                    "Exception while initializing Auditor", ioe);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new UnavailableException(
@@ -694,6 +690,9 @@ public class Auditor implements AutoCloseable {
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     LOG.error("Interrupted while for LedgersReplication to be enabled ", ie);
+                } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                    LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                    submitShutdownTask();
                 } catch (UnavailableException ue) {
                     LOG.error("Exception while reading from ZK", ue);
                 } finally {
@@ -771,6 +770,10 @@ public class Auditor implements AutoCloseable {
             long initialDelay;
             try {
                 checkAllLedgersLastExecutedCTime = ledgerUnderreplicationManager.getCheckAllLedgersCTime();
+            } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                submitShutdownTask();
+                return;
             } catch (UnavailableException ue) {
                 LOG.error("Got UnavailableException while trying to get checkAllLedgersCTime", ue);
                 checkAllLedgersLastExecutedCTime = -1;
@@ -810,8 +813,6 @@ public class Auditor implements AutoCloseable {
                         LOG.info("Completed checkAllLedgers in {} milliSeconds", checkAllLedgersDuration);
                         checkAllLedgersTime.registerSuccessfulEvent(checkAllLedgersDuration, TimeUnit.MILLISECONDS);
                         checkSuccess = true;
-                    } catch (KeeperException ke) {
-                        LOG.error("Exception while running periodic check", ke);
                     } catch (InterruptedException ie) {
                         Thread.currentThread().interrupt();
                         LOG.error("Interrupted while running periodic check", ie);
@@ -819,6 +820,9 @@ public class Auditor implements AutoCloseable {
                         LOG.error("Exception running periodic check", bke);
                     } catch (IOException ioe) {
                         LOG.error("I/O exception running periodic check", ioe);
+                    } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                        LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                        submitShutdownTask();
                     } catch (ReplicationException.UnavailableException ue) {
                         LOG.error("Underreplication manager unavailable running periodic check", ue);
                     } finally {
@@ -846,6 +850,10 @@ public class Auditor implements AutoCloseable {
             long initialDelay;
             try {
                 placementPolicyCheckLastExecutedCTime = ledgerUnderreplicationManager.getPlacementPolicyCheckCTime();
+            } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                submitShutdownTask();
+                return;
             } catch (UnavailableException ue) {
                 LOG.error("Got UnavailableException while trying to get placementPolicyCheckCTime", ue);
                 placementPolicyCheckLastExecutedCTime = -1;
@@ -969,6 +977,10 @@ public class Auditor implements AutoCloseable {
         long initialDelay;
         try {
             replicasCheckLastExecutedCTime = ledgerUnderreplicationManager.getReplicasCheckCTime();
+        } catch (ReplicationException.NonRecoverableReplicationException nre) {
+            LOG.error("Non Recoverable Exception while reading from ZK", nre);
+            submitShutdownTask();
+            return;
         } catch (UnavailableException ue) {
             LOG.error("Got UnavailableException while trying to get replicasCheckCTime", ue);
             replicasCheckLastExecutedCTime = -1;
@@ -1070,6 +1082,9 @@ public class Auditor implements AutoCloseable {
             try {
                 Auditor.this.ledgerUnderreplicationManager
                         .notifyLostBookieRecoveryDelayChanged(LostBookieRecoveryDelayChangedCb.this);
+            } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                submitShutdownTask();
             } catch (UnavailableException ae) {
                 LOG.error("Exception while registering for a LostBookieRecoveryDelay notification", ae);
             }
@@ -1134,6 +1149,10 @@ public class Auditor implements AutoCloseable {
             throws BKAuditException, InterruptedException, BKException {
         try {
             waitIfLedgerReplicationDisabled();
+        } catch (ReplicationException.NonRecoverableReplicationException nre) {
+            LOG.error("Non Recoverable Exception while reading from ZK", nre);
+            submitShutdownTask();
+            return;
         } catch (UnavailableException ue) {
             LOG.error("Underreplication unavailable, skipping audit."
                       + "Will retry after a period");
@@ -1294,7 +1313,7 @@ public class Auditor implements AutoCloseable {
      * List all the ledgers and check them individually. This should not
      * be run very often.
      */
-    void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
+    void checkAllLedgers() throws BKException, IOException, InterruptedException {
         final BookKeeper localClient = getBookKeeper(conf);
         final BookKeeperAdmin localAdmin = getBookKeeperAdmin(localClient);
         try {
@@ -1309,6 +1328,10 @@ public class Auditor implements AutoCloseable {
                         FutureUtils.complete(processFuture, null);
                         return;
                     }
+                } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                    LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                    submitShutdownTask();
+                    return;
                 } catch (UnavailableException ue) {
                     LOG.error("Underreplication manager unavailable running periodic check", ue);
                     FutureUtils.complete(processFuture, null);
@@ -1366,6 +1389,9 @@ public class Auditor implements AutoCloseable {
             FutureUtils.result(processFuture, BKException.HANDLER);
             try {
                 ledgerUnderreplicationManager.setCheckAllLedgersCTime(System.currentTimeMillis());
+            } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                LOG.error("Non Recoverable Exception while reading from ZK", nre);
+                submitShutdownTask();
             } catch (UnavailableException ue) {
                 LOG.error("Got exception while trying to set checkAllLedgersCTime", ue);
             }
@@ -1489,6 +1515,9 @@ public class Auditor implements AutoCloseable {
         }
         try {
             ledgerUnderreplicationManager.setPlacementPolicyCheckCTime(System.currentTimeMillis());
+        } catch (ReplicationException.NonRecoverableReplicationException nre) {
+            LOG.error("Non Recoverable Exception while reading from ZK", nre);
+            submitShutdownTask();
         } catch (UnavailableException ue) {
             LOG.error("Got exception while trying to set PlacementPolicyCheckCTime", ue);
         }
@@ -1952,6 +1981,9 @@ public class Auditor implements AutoCloseable {
         }
         try {
             ledgerUnderreplicationManager.setReplicasCheckCTime(System.currentTimeMillis());
+        } catch (ReplicationException.NonRecoverableReplicationException nre) {
+            LOG.error("Non Recoverable Exception while reading from ZK", nre);
+            submitShutdownTask();
         } catch (UnavailableException ue) {
             LOG.error("Got exception while trying to set ReplicasCheckCTime", ue);
         }
@@ -2053,6 +2085,10 @@ public class Auditor implements AutoCloseable {
                     ledgerInRange);
             mcbForThisLedgerRange.processResult(BKException.Code.OK, null, null);
             return true;
+        } catch (ReplicationException.NonRecoverableReplicationException nre) {
+            LOG.error("Non Recoverable Exception while reading from ZK", nre);
+            submitShutdownTask();
+            return true;
         } catch (UnavailableException une) {
             LOG.error("Got exception while trying to check if ledger: {} is underreplicated", ledgerInRange, une);
             mcbForThisLedgerRange.processResult(BKException.getExceptionCode(une), null, null);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 733f63bde8..34479fbf11 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -19,12 +19,22 @@
 package org.apache.bookkeeper.replication;
 
 import java.util.function.Function;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Exceptions for use within the replication service.
  */
 public abstract class ReplicationException extends Exception {
 
+    public static UnavailableException fromKeeperException(String message, KeeperException ke) {
+        if (ke instanceof KeeperException.ConnectionLossException
+                || ke instanceof KeeperException.SessionExpiredException) {
+            return new NonRecoverableReplicationException(message, ke);
+        }
+        return new UnavailableException(message, ke);
+    }
+
+
     public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> {
         if (cause instanceof ReplicationException) {
             return (ReplicationException) cause;
@@ -56,6 +66,21 @@ public abstract class ReplicationException extends Exception {
         }
     }
 
+    /**
+     * The replication service encountered an error that requires service restart.
+     */
+    public static class NonRecoverableReplicationException extends UnavailableException {
+        private static final long serialVersionUID = 31872211L;
+
+        public NonRecoverableReplicationException(String message, Throwable cause) {
+            super(message, cause);
+        }
+
+        public NonRecoverableReplicationException(String message) {
+            super(message);
+        }
+    }
+
     /**
      * Compatibility error. This version of the code, doesn't know how to
      * deal with the metadata it has found.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 721380a656..c4a0e430d6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -69,7 +69,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,7 +141,7 @@ public class ReplicationWorker implements Runnable {
      *            - configurations
      */
     public ReplicationWorker(final ServerConfiguration conf)
-            throws CompatibilityException, KeeperException,
+            throws CompatibilityException, UnavailableException,
             InterruptedException, IOException {
         this(conf, NullStatsLogger.INSTANCE);
     }
@@ -159,8 +158,7 @@ public class ReplicationWorker implements Runnable {
      */
     public ReplicationWorker(final ServerConfiguration conf,
                              StatsLogger statsLogger)
-            throws CompatibilityException, KeeperException,
-
+            throws CompatibilityException, UnavailableException,
             InterruptedException, IOException {
         this(conf, Auditor.createBookKeeperClient(conf), true, statsLogger);
     }
@@ -169,8 +167,7 @@ public class ReplicationWorker implements Runnable {
                       BookKeeper bkc,
                       boolean ownBkc,
                       StatsLogger statsLogger)
-            throws CompatibilityException, KeeperException,
-            InterruptedException, IOException {
+            throws CompatibilityException, InterruptedException, UnavailableException {
         this.conf = conf;
         this.bkc = bkc;
         this.ownBkc = ownBkc;
@@ -243,6 +240,11 @@ public class ReplicationWorker implements Runnable {
             } catch (BKException e) {
                 LOG.error("BKException while replicating fragments", e);
                 waitBackOffTime(rwRereplicateBackoffMs);
+            } catch (ReplicationException.NonRecoverableReplicationException nre) {
+                LOG.error("NonRecoverableReplicationException "
+                        + "while replicating fragments", nre);
+                shutdown();
+                return;
             } catch (UnavailableException e) {
                 LOG.error("UnavailableException "
                         + "while replicating fragments", e);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
index 7979d1d1b5..09077cef70 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommand.java
@@ -39,7 +39,6 @@ import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
 import org.apache.bookkeeper.util.LedgerIdFormatter;
 import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,7 +140,7 @@ public class ListUnderReplicatedCommand extends BookieCommand<ListUnderReplicate
             LedgerUnderreplicationManager underreplicationManager;
             try {
                 underreplicationManager = mFactory.newLedgerUnderreplicationManager();
-            } catch (KeeperException | ReplicationException.CompatibilityException e) {
+            } catch (ReplicationException e) {
                 throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
index 0f86a2d2ec..b2b5e01e40 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/QueryAutoRecoveryStatusCommand.java
@@ -34,7 +34,6 @@ import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,7 +98,7 @@ public class QueryAutoRecoveryStatusCommand
             List<LedgerRecoverInfo> ledgerList = new LinkedList<>();
             try {
                 underreplicationManager = mFactory.newLedgerUnderreplicationManager();
-            } catch (KeeperException | ReplicationException.CompatibilityException e) {
+            } catch (ReplicationException e) {
                 throw new UncheckedExecutionException("Failed to new ledger underreplicated manager", e);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
index b43e73fc62..99185a8a97 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ToggleCommand.java
@@ -32,7 +32,6 @@ import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +112,7 @@ public class ToggleCommand extends BookieCommand<ToggleCommand.AutoRecoveryFlags
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new UncheckedExecutionException(e);
-            } catch (KeeperException | ReplicationException e) {
+            } catch (ReplicationException e) {
                 throw new UncheckedExecutionException(e);
             }
             return null;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
index ad96129914..18785a58b4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
@@ -25,7 +25,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.zookeeper.KeeperException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -62,7 +61,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
                 urM.setCheckAllLedgersCTime(curTime);
                 urM.setPlacementPolicyCheckCTime(curTime);
                 urM.setReplicasCheckCTime(curTime);
-            } catch (InterruptedException | KeeperException | ReplicationException e) {
+            } catch (InterruptedException | ReplicationException e) {
                 throw new UncheckedExecutionException(e);
             }
             return null;
@@ -87,7 +86,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
                 if (replicasCheckCTime > (curTime - (20 * 24 * 60 * 60 * 1000))) {
                     Assert.fail("The replicasCheckCTime should have been reset to atleast 20 days old");
                 }
-            } catch (InterruptedException | KeeperException | ReplicationException e) {
+            } catch (InterruptedException | ReplicationException e) {
                 throw new UncheckedExecutionException(e);
             }
             return null;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 18ace93533..635ef87ab5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -81,7 +81,6 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
 import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
-import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -730,7 +729,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
             super(bookieIdentifier, conf, statsLogger);
         }
 
-        void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
+        void checkAllLedgers() throws BKException, IOException, InterruptedException {
             super.checkAllLedgers();
             latchRef.get().countDown();
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index bb302350a1..a69b9cbd49 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -34,11 +34,10 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import lombok.Cleanup;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.ClientUtil;
@@ -67,6 +66,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -103,13 +103,16 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
     }
 
     TestReplicationWorker(String ledgerManagerFactory) {
-        super(3);
+        super(3, 300);
         LOG.info("Running test case using ledger manager : "
                 + ledgerManagerFactory);
         // set ledger manager name
         baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
         baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
         baseConf.setRereplicationEntryBatchSize(3);
+        baseConf.setZkTimeout(7000);
+        baseConf.setZkRetryBackoffMaxMs(500);
+        baseConf.setZkRetryBackoffStartMs(10);
     }
 
     @Override
@@ -594,9 +597,11 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
             int rw1UnableToReadEntriesForReplication = rw1.unableToReadEntriesForReplication.get(lh.getId()).size();
             int rw2UnableToReadEntriesForReplication = rw2.unableToReadEntriesForReplication.get(lh.getId()).size();
             assertTrue(
-                    "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication + " in RW2: "
+                    "unableToReadEntriesForReplication in RW1: " + rw1UnableToReadEntriesForReplication
+                            + " in RW2: "
                             + rw2UnableToReadEntriesForReplication,
-                    (rw1UnableToReadEntriesForReplication == 0) || (rw2UnableToReadEntriesForReplication == 0));
+                    (rw1UnableToReadEntriesForReplication == 0)
+                            || (rw2UnableToReadEntriesForReplication == 0));
         } finally {
             rw1.shutdown();
             rw2.shutdown();
@@ -609,7 +614,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
 
         public InjectedReplicationWorker(ServerConfiguration conf, StatsLogger statsLogger,
                 CopyOnWriteArrayList<Long> delayReplicationPeriods)
-                throws CompatibilityException, KeeperException, InterruptedException, IOException {
+                throws CompatibilityException, ReplicationException.UnavailableException,
+                InterruptedException, IOException {
             super(conf, statsLogger);
             this.delayReplicationPeriods = delayReplicationPeriods;
         }
@@ -829,18 +835,63 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
             assertTrue("Replication worker should be running", rw.isRunning());
 
             stopZKCluster();
-            // Wait for disconnection to be picked up
+            // ZK is down for shorter period than reconnect timeout
+            Thread.sleep(1000);
+            startZKCluster();
+
+            assertTrue("Replication worker should not shutdown", rw.isRunning());
+        }
+    }
+
+    /**
+     * Test that the replication worker shuts down on non-recoverable ZK connection loss.
+     */
+    @Test
+    public void testRWZKConnectionLostOnNonRecoverableZkError() throws Exception {
+        for (int j = 0; j < 3; j++) {
+            LedgerHandle lh = bkc.createLedger(1, 1, 1,
+                    BookKeeper.DigestType.CRC32, TESTPASSWD,
+                    null);
+            final long createdLedgerId = lh.getId();
             for (int i = 0; i < 10; i++) {
-                if (!zk.getState().isConnected()) {
-                    break;
-                }
-                Thread.sleep(1000);
+                lh.addEntry(data);
             }
-            assertFalse(zk.getState().isConnected());
-            startZKCluster();
+            lh.close();
+        }
+
+        killBookie(2);
+        killBookie(1);
+        startNewBookie();
+        startNewBookie();
+
+        servers.get(0).getConfiguration().setRwRereplicateBackoffMs(100);
+        servers.get(0).startAutoRecovery();
+
+        Auditor auditor = getAuditor(10, TimeUnit.SECONDS);
+        ReplicationWorker rw = servers.get(0).getReplicationWorker();
+
+        ZkLedgerUnderreplicationManager ledgerUnderreplicationManager =
+                (ZkLedgerUnderreplicationManager) FieldUtils.readField(auditor,
+                        "ledgerUnderreplicationManager", true);
 
-            assertTrue("Replication worker should still be running", rw.isRunning());
+        ZooKeeper zkc = (ZooKeeper) FieldUtils.readField(ledgerUnderreplicationManager, "zkc", true);
+        auditor.submitAuditTask().get();
+
+        assertTrue(zkc.getState().isConnected());
+        zkc.close();
+        assertFalse(zkc.getState().isConnected());
+
+        auditor.submitAuditTask();
+        rw.run();
+
+        for (int i = 0; i < 10; i++) {
+            if (!rw.isRunning() && !auditor.isRunning()) {
+                break;
+            }
+            Thread.sleep(1000);
         }
+        assertFalse("Replication worker should NOT be running", rw.isRunning());
+        assertFalse("Auditor should NOT be running", auditor.isRunning());
     }
 
     private void killAllBookies(LedgerHandle lh, BookieId excludeBK)
@@ -975,7 +1026,8 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
          */
         BookKeeper bkWithMockZK = new BookKeeper(baseClientConf, zkFaultInjectionWrapper);
         long ledgerId = 567L;
-        LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2, BookKeeper.DigestType.CRC32, TESTPASSWD,
+        LedgerHandle lh = bkWithMockZK.createLedgerAdv(ledgerId, 2, 2, 2,
+                BookKeeper.DigestType.CRC32, TESTPASSWD,
                 null);
         for (int i = 0; i < 10; i++) {
             lh.addEntry(i, data);
diff --git a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
index 4d83f73bf0..11eef381c1 100644
--- a/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
+++ b/metadata-drivers/etcd/src/main/java/org/apache/bookkeeper/metadata/etcd/EtcdLedgerManagerFactory.java
@@ -30,6 +30,7 @@ import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
+import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
@@ -88,7 +89,7 @@ class EtcdLedgerManagerFactory implements LedgerManagerFactory {
 
     @Override
     public LedgerUnderreplicationManager newLedgerUnderreplicationManager()
-        throws KeeperException, InterruptedException, CompatibilityException {
+        throws ReplicationException.UnavailableException, InterruptedException, CompatibilityException {
         throw new UnsupportedOperationException();
     }
 
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
index b4d86828a7..b0453980bc 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/ListUnderReplicatedCommandTest.java
@@ -72,7 +72,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
 
     @Test
     public void testWithoutArgs()
-        throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+        throws InterruptedException, ReplicationException {
         testCommand("");
         verify(factory, times(1)).newLedgerUnderreplicationManager();
         verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -82,7 +82,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
 
     @Test
     public void testMissingReplica()
-        throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+        throws InterruptedException, ReplicationException {
         testCommand("-mr", "");
         verify(factory, times(1)).newLedgerUnderreplicationManager();
         verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -92,7 +92,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
 
     @Test
     public void testExcludingMissingReplica()
-        throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+        throws InterruptedException, ReplicationException {
         testCommand("-emr", "");
         verify(factory, times(1)).newLedgerUnderreplicationManager();
         verify(underreplicationManager, times(1)).listLedgersToRereplicate(any());
@@ -102,7 +102,7 @@ public class ListUnderReplicatedCommandTest extends BookieCommandTestBase {
 
     @Test
     public void testPrintMissingReplica()
-        throws InterruptedException, ReplicationException.CompatibilityException, KeeperException {
+        throws InterruptedException, ReplicationException {
 
         ArrayList<String> list = new ArrayList<>();
         list.add("replica");