You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/27 03:49:37 UTC

[bookkeeper] branch branch-4.7 updated: Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 3e01125  Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager
3e01125 is described below

commit 3e0112505381b3eb922f21de1f18daf302002201
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sun Aug 26 19:51:08 2018 -0700

    Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager
    
    Descriptions of the changes in this PR:
    
     ### Motivation
    
    Auditor has multiple places calling sync methods in async callbacks.
    This raises the possibility hitting deadlock. Issue #1578 is one of the examples.
    
    After looking into the `LedgerUnderreplicationManager`, `markLedgerUnderreplicated`
    is the only interface that will be called in async callbacks. This change is
    to provide an async version of `markLedgerUnderreplicated`.
    
     ### Changes
    
    - add `markLedgerUnderreplicatedAsync` interface in `LedgerUnderreplicationManager`.
    - implement the logic of `markLedgerUnderreplicated` using async callbacks
    - use `markLedgerUnderreplicatedAsync` in the Auditor
    
    Related Issues: #1578
    Master Issue: #1617
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Charan Reddy Guttapalem <re...@gmail.com>, Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>
    
    This closes #1619 from sijie/async_sync_autorecovery
---
 .../org/apache/bookkeeper/client/BKException.java  |  15 ++
 .../meta/LedgerUnderreplicationManager.java        |  22 +-
 .../meta/ZkLedgerUnderreplicationManager.java      | 123 +++++++----
 .../org/apache/bookkeeper/replication/Auditor.java | 238 +++++++++------------
 .../replication/ReplicationException.java          |  11 +
 .../TestLedgerUnderreplicationManager.java         |   4 +-
 6 files changed, 224 insertions(+), 189 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 1a15ebf..ddfc795 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.util.function.Function;
+
 /**
  * Class the enumerates all the possible error conditions.
  *
@@ -28,6 +30,19 @@ package org.apache.bookkeeper.client;
 @SuppressWarnings("serial")
 public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {
 
+    public static final Function<Throwable, BKException> HANDLER = cause -> {
+        if (cause == null) {
+            return null;
+        }
+        if (cause instanceof BKException) {
+            return (BKException) cause;
+        } else {
+            BKException ex = new BKUnexpectedConditionException();
+            ex.initCause(cause);
+            return ex;
+        }
+    };
+
     BKException(int code) {
         super(code);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 21c6a4f..5bad3cc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -17,11 +17,15 @@
  */
 package org.apache.bookkeeper.meta;
 
+import com.google.common.collect.Lists;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException;
 
@@ -29,12 +33,26 @@ import org.apache.bookkeeper.replication.ReplicationException;
  * Interface for marking ledgers which need to be rereplicated.
  */
 public interface LedgerUnderreplicationManager extends AutoCloseable {
+
     /**
      * Mark a ledger as underreplicated. The replication should
      * then check which fragments are underreplicated and rereplicate them
      */
-    void markLedgerUnderreplicated(long ledgerId, String missingReplica)
-            throws ReplicationException.UnavailableException;
+    default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException {
+        FutureUtils.result(
+            markLedgerUnderreplicatedAsync(
+                ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
+    }
+
+    /**
+     * Mark a ledger as underreplicated with missing bookies. The replication should then
+     * check which fragements are underreplicated and rereplicate them.
+     *
+     * @param ledgerId ledger id
+     * @param missingReplicas missing replicas
+     * @return a future presents the mark result.
+     */
+    CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas);
 
     /**
      * Mark a ledger as fully replicated. If the ledger is not
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 4289bcd..db09df9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -24,21 +24,25 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.protobuf.TextFormat;
 
+import com.google.protobuf.TextFormat.ParseException;
 import java.net.UnknownHostException;
 import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.DNS;
@@ -54,6 +58,7 @@ import org.apache.bookkeeper.util.SubTreeCache;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -254,55 +259,89 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
     }
 
     @Override
-    public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
-            throws ReplicationException.UnavailableException {
+    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica);
-        }
-        try {
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            String znode = getUrLedgerZnode(ledgerId);
-            while (true) {
-                UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
+        }
+        final List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        final String znode = getUrLedgerZnode(ledgerId);
+        final CompletableFuture<Void> createFuture = new CompletableFuture<>();
+        tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
+        return createFuture;
+    }
+
+    private void tryMarkLedgerUnderreplicatedAsync(final String znode,
+                                                   final Collection<String> missingReplicas,
+                                                   final List<ACL> zkAcls,
+                                                   final CompletableFuture<Void> finalFuture) {
+        final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+        missingReplicas.forEach(builder::addReplica);
+        final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+        ZkUtils.asyncCreateFullPathOptimistic(
+            zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
+            (rc, path, ctx, name) -> {
+                if (Code.OK.intValue() == rc) {
+                    FutureUtils.complete(finalFuture, null);
+                } else if (Code.NODEEXISTS.intValue() == rc) {
+                    // we need to handle the case where the ledger has been marked as underreplicated
+                    handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+                } else {
+                    FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
+                }
+            }, null);
+    }
+
+
+    private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
+                                                          final Collection<String> missingReplicas,
+                                                          final List<ACL> zkAcls,
+                                                          final CompletableFuture<Void> finalFuture) {
+        // get the existing underreplicated ledger data
+        zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
+            if (Code.OK.intValue() == getRc) {
+                // deserialize existing underreplicated ledger data
+                final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
                 try {
-                    builder.addReplica(missingReplica);
-                    ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
-                            .printToString(builder.build()).getBytes(UTF_8),
-                            zkAcls, CreateMode.PERSISTENT);
-                } catch (KeeperException.NodeExistsException nee) {
-                    Stat s = zkc.exists(znode, false);
-                    if (s == null) {
+                    TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
+                } catch (ParseException e) {
+                    // corrupted metadata in zookeeper
+                    FutureUtils.completeExceptionally(finalFuture,
+                        new ReplicationException.UnavailableException(
+                            "Invalid underreplicated ledger data for ledger " + znode, e));
+                    return;
+                }
+                UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
+                boolean replicaAdded = false;
+                for (String missingReplica : missingReplicas) {
+                    if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
                         continue;
-                    }
-                    try {
-                        byte[] bytes = zkc.getData(znode, false, s);
-                        builder.clear();
-                        TextFormat.merge(new String(bytes, UTF_8), builder);
-                        UnderreplicatedLedgerFormat data = builder.build();
-                        if (data.getReplicaList().contains(missingReplica)) {
-                            return; // nothing to add
-                        }
+                    } else {
                         builder.addReplica(missingReplica);
-                        zkc.setData(znode,
-                                    TextFormat.printToString(builder.build()).getBytes(UTF_8),
-                                    s.getVersion());
-                    } catch (KeeperException.NoNodeException nne) {
-                        continue;
-                    } catch (KeeperException.BadVersionException bve) {
-                        continue;
-                    } catch (TextFormat.ParseException pe) {
-                        throw new ReplicationException.UnavailableException(
-                                "Invalid data found", pe);
+                        replicaAdded = true;
                     }
                 }
-                break;
+                if (!replicaAdded) { // no new missing replica is added
+                    FutureUtils.complete(finalFuture, null);
+                    return;
+                }
+                final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+                zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
+                    if (Code.OK.intValue() == setRc) {
+                        FutureUtils.complete(finalFuture, null);
+                    } else if (Code.NONODE.intValue() == setRc) {
+                        tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+                    } else if (Code.BADVERSION.intValue() == setRc) {
+                        handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+                    } else {
+                        FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
+                    }
+                }, null);
+            } else if (Code.NONODE.intValue() == getRc) {
+                tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+            } else {
+                FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
             }
-        } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
-        }
+        }, null);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 8578a5b..f937632 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
@@ -30,20 +31,21 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
@@ -366,8 +368,6 @@ public class Auditor {
                             } catch (InterruptedException ie) {
                                 Thread.currentThread().interrupt();
                                 LOG.error("Interrupted while running periodic check", ie);
-                            } catch (BKAuditException bkae) {
-                                LOG.error("Exception while running periodic check", bkae);
                             } catch (BKException bke) {
                                 LOG.error("Exception running periodic check", bke);
                             } catch (IOException ioe) {
@@ -467,8 +467,6 @@ public class Auditor {
             LOG.error("Interrupted while watching available bookies ", ie);
         } catch (BKAuditException bke) {
             LOG.error("Exception while watching available bookies", bke);
-        } catch (KeeperException ke) {
-            LOG.error("Exception reading bookie list", ke);
         }
         if (shutDownTask) {
             submitShutdownTask();
@@ -477,8 +475,7 @@ public class Auditor {
 
     @SuppressWarnings("unchecked")
     private void auditBookies()
-            throws BKAuditException, KeeperException,
-            InterruptedException, BKException {
+            throws BKAuditException, InterruptedException, BKException {
         try {
             waitIfLedgerReplicationDisabled();
         } catch (UnavailableException ue) {
@@ -512,7 +509,12 @@ public class Auditor {
         bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
                 TimeUnit.MILLISECONDS);
         if (lostBookies.size() > 0) {
-            handleLostBookies(lostBookies, ledgerDetails);
+            try {
+                FutureUtils.result(
+                    handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
+            } catch (ReplicationException e) {
+                throw new BKAuditException(e.getMessage(), e.getCause());
+            }
             uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
                     TimeUnit.MILLISECONDS);
         }
@@ -524,38 +526,33 @@ public class Auditor {
         return bookieLedgerIndexer.getBookieToLedgerIndex();
     }
 
-    private void handleLostBookies(Collection<String> lostBookies,
-            Map<String, Set<Long>> ledgerDetails) throws BKAuditException {
-        LOG.info("Following are the failed bookies: " + lostBookies
-                + " and searching its ledgers for re-replication");
-
-        for (String bookieIP : lostBookies) {
-            // identify all the ledgers in bookieIP and publishing these ledgers
-            // as under-replicated.
-            publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP));
-        }
+    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies,
+                                                        Map<String, Set<Long>> ledgerDetails) {
+        LOG.info("Following are the failed bookies: {},"
+                + " and searching its ledgers for re-replication", lostBookies);
+
+        return FutureUtils.processList(
+            Lists.newArrayList(lostBookies),
+            bookieIP -> publishSuspectedLedgersAsync(
+                Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),
+            null
+        );
     }
 
-    private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers)
-            throws BKAuditException {
+    private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
         if (null == ledgers || ledgers.size() == 0) {
             // there is no ledgers available for this bookie and just
             // ignoring the bookie failures
-            LOG.info("There is no ledgers for the failed bookie: {}", bookieIP);
-            return;
+            LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
+            return FutureUtils.Void();
         }
-        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, bookieIP);
+        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
         numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
-        for (Long ledgerId : ledgers) {
-            try {
-                ledgerUnderreplicationManager.markLedgerUnderreplicated(
-                        ledgerId, bookieIP);
-            } catch (UnavailableException ue) {
-                throw new BKAuditException(
-                        "Failed to publish underreplicated ledger: " + ledgerId
-                                + " of bookie: " + bookieIP, ue);
-            }
-        }
+        return FutureUtils.processList(
+            Lists.newArrayList(ledgers),
+            ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
+            null
+        );
     }
 
     /**
@@ -571,36 +568,31 @@ public class Auditor {
         }
 
         public void operationComplete(int rc, Set<LedgerFragment> fragments) {
-            try {
-                if (rc == BKException.Code.OK) {
-                    Set<BookieSocketAddress> bookies = Sets.newHashSet();
-                    for (LedgerFragment f : fragments) {
-                        bookies.addAll(f.getAddresses());
-                    }
-                    for (BookieSocketAddress bookie : bookies) {
-                        publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
-                    }
-                }
-                lh.close();
-            } catch (BKException bke) {
-                LOG.error("Error closing lh", bke);
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.ReplicationException;
-                }
-            } catch (InterruptedException ie) {
-                LOG.error("Interrupted publishing suspected ledger", ie);
-                Thread.currentThread().interrupt();
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.InterruptedException;
-                }
-            } catch (BKAuditException bkae) {
-                LOG.error("Auditor exception publishing suspected ledger", bkae);
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.ReplicationException;
+            if (rc == BKException.Code.OK) {
+                Set<BookieSocketAddress> bookies = Sets.newHashSet();
+                for (LedgerFragment f : fragments) {
+                    bookies.addAll(f.getAddresses());
                 }
+                publishSuspectedLedgersAsync(
+                    bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()),
+                    Sets.newHashSet(lh.getId())
+                ).whenComplete((result, cause) -> {
+                    if (null != cause) {
+                        LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
+                            lh.getId(), bookies, cause);
+                        callback.processResult(Code.ReplicationException, null, null);
+                    } else {
+                        callback.processResult(Code.OK, null, null);
+                    }
+                });
+            } else {
+                callback.processResult(rc, null, null);
             }
-
-            callback.processResult(rc, null, null);
+            lh.closeAsync().whenComplete((result, cause) -> {
+                if (null != cause) {
+                    LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage());
+                }
+            });
         }
     }
 
@@ -608,8 +600,7 @@ public class Auditor {
      * List all the ledgers and check them individually. This should not
      * be run very often.
      */
-    void checkAllLedgers() throws BKAuditException, BKException,
-            IOException, InterruptedException, KeeperException {
+    void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
         ZooKeeper newzk = ZooKeeperClient.newBuilder()
                 .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
                 .sessionTimeoutMs(conf.getZkTimeout())
@@ -622,91 +613,54 @@ public class Auditor {
         try {
             final LedgerChecker checker = new LedgerChecker(client);
 
-            final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
-            final CountDownLatch processDone = new CountDownLatch(1);
+            final CompletableFuture<Void> processFuture = new CompletableFuture<>();
 
-            Processor<Long> checkLedgersProcessor = new Processor<Long>() {
-                @Override
-                public void process(final Long ledgerId,
-                                    final AsyncCallback.VoidCallback callback) {
-                    try {
-                        if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
-                            LOG.info("Ledger rereplication has been disabled, aborting periodic check");
-                            processDone.countDown();
-                            return;
-                        }
-                    } catch (ReplicationException.UnavailableException ue) {
-                        LOG.error("Underreplication manager unavailable running periodic check", ue);
-                        processDone.countDown();
+            Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
+                try {
+                    if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+                        LOG.info("Ledger rereplication has been disabled, aborting periodic check");
+                        FutureUtils.complete(processFuture, null);
                         return;
                     }
+                } catch (UnavailableException ue) {
+                    LOG.error("Underreplication manager unavailable running periodic check", ue);
+                    FutureUtils.complete(processFuture, null);
+                    return;
+                }
 
-                    // Do not perform blocking calls that involve making ZK calls from within the ZK
-                    // event thread. Jump to background thread instead to avoid deadlock.
-                    ForkJoinPool.commonPool().execute(() -> {
-                        LedgerHandle lh = null;
-                        try {
-                            lh = admin.openLedgerNoRecovery(ledgerId);
-                            checker.checkLedger(lh,
-                                    new ProcessLostFragmentsCb(lh, callback),
-                                    conf.getAuditorLedgerVerificationPercentage());
-                            // we collect the following stats to get a measure of the
-                            // distribution of a single ledger within the bk cluster
-                            // the higher the number of fragments/bookies, the more distributed it is
-                            numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
-                            numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
-                            numLedgersChecked.inc();
-                        } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Ledger was deleted before we could check it", bknsle);
-                            }
-                            callback.processResult(BKException.Code.OK,
-                                    null, null);
-                            return;
-                        } catch (BKException bke) {
-                            LOG.error("Couldn't open ledger " + ledgerId, bke);
-                            callback.processResult(BKException.Code.BookieHandleNotAvailableException,
-                                    null, null);
-                            return;
-                        } catch (InterruptedException ie) {
-                            LOG.error("Interrupted opening ledger", ie);
-                            Thread.currentThread().interrupt();
-                            callback.processResult(BKException.Code.InterruptedException, null, null);
-                            return;
-                        } finally {
-                            if (lh != null) {
-                                try {
-                                    lh.close();
-                                } catch (BKException bke) {
-                                    LOG.warn("Couldn't close ledger " + ledgerId, bke);
-                                } catch (InterruptedException ie) {
-                                    LOG.warn("Interrupted closing ledger " + ledgerId, ie);
-                                    Thread.currentThread().interrupt();
-                                }
-                            }
+                admin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
+                    if (Code.OK == rc) {
+                        checker.checkLedger(lh,
+                                // the ledger handle will be closed after checkLedger is done.
+                                new ProcessLostFragmentsCb(lh, callback),
+                                conf.getAuditorLedgerVerificationPercentage());
+                        // we collect the following stats to get a measure of the
+                        // distribution of a single ledger within the bk cluster
+                        // the higher the number of fragments/bookies, the more distributed it is
+                        numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+                        numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+                        numLedgersChecked.inc();
+                    } else if (Code.NoSuchLedgerExistsException == rc) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
                         }
-                    });
-                }
+                        callback.processResult(Code.OK, null, null);
+                    } else {
+                        LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc));
+                        callback.processResult(rc, null, null);
+                    }
+                }, null);
             };
 
             ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
-                    new AsyncCallback.VoidCallback() {
-                        @Override
-                        public void processResult(int rc, String s, Object obj) {
-                            returnCode.set(rc);
-                            processDone.countDown();
-                        }
-                    }, null, BKException.Code.OK, BKException.Code.ReadException);
-            try {
-                processDone.await();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new BKAuditException(
-                        "Exception while checking ledgers", e);
-            }
-            if (returnCode.get() != BKException.Code.OK) {
-                throw BKException.create(returnCode.get());
-            }
+                (rc, path, ctx) -> {
+                    if (Code.OK == rc) {
+                        FutureUtils.complete(processFuture, null);
+                    } else {
+                        FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
+                    }
+                }, null, BKException.Code.OK, BKException.Code.ReadException);
+            FutureUtils.result(processFuture, BKException.HANDLER);
         } finally {
             admin.close();
             client.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 1afee69..733f63b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -18,10 +18,21 @@
 
 package org.apache.bookkeeper.replication;
 
+import java.util.function.Function;
+
 /**
  * Exceptions for use within the replication service.
  */
 public abstract class ReplicationException extends Exception {
+
+    public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> {
+        if (cause instanceof ReplicationException) {
+            return (ReplicationException) cause;
+        } else {
+            return new UnavailableException(cause.getMessage(), cause);
+        }
+    };
+
     protected ReplicationException(String message, Throwable cause) {
         super(message, cause);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index 229af16..214777e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -52,7 +52,6 @@ import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
-import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -724,8 +723,7 @@ public class TestLedgerUnderreplicationManager {
     }
 
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
-            throws KeeperException, InterruptedException,
-            CompatibilityException, UnavailableException {
+            throws KeeperException, InterruptedException, ReplicationException {
         Long ledgerA = 0xfeadeefdacL;
         String znodeA = getUrLedgerZnode(ledgerA);
         LedgerUnderreplicationManager replicaMgr = lmf1