You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/27 02:51:20 UTC

[GitHub] sijie closed pull request #1619: Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager

sijie closed pull request #1619: Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager
URL: https://github.com/apache/bookkeeper/pull/1619
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 1a15ebf370..ddfc795acb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.client;
 
+import java.util.function.Function;
+
 /**
  * Class the enumerates all the possible error conditions.
  *
@@ -28,6 +30,19 @@
 @SuppressWarnings("serial")
 public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {
 
+    public static final Function<Throwable, BKException> HANDLER = cause -> {
+        if (cause == null) {
+            return null;
+        }
+        if (cause instanceof BKException) {
+            return (BKException) cause;
+        } else {
+            BKException ex = new BKUnexpectedConditionException();
+            ex.initCause(cause);
+            return ex;
+        }
+    };
+
     BKException(int code) {
         super(code);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index ed8715481a..b5447beee2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -17,10 +17,14 @@
  */
 package org.apache.bookkeeper.meta;
 
+import com.google.common.collect.Lists;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Predicate;
 
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.ReplicationException;
 
@@ -28,12 +32,26 @@
  * Interface for marking ledgers which need to be rereplicated.
  */
 public interface LedgerUnderreplicationManager extends AutoCloseable {
+
     /**
      * Mark a ledger as underreplicated. The replication should
      * then check which fragments are underreplicated and rereplicate them
      */
-    void markLedgerUnderreplicated(long ledgerId, String missingReplica)
-            throws ReplicationException.UnavailableException;
+    default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException {
+        FutureUtils.result(
+            markLedgerUnderreplicatedAsync(
+                ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
+    }
+
+    /**
+     * Mark a ledger as underreplicated with missing bookies. The replication should then
+     * check which fragements are underreplicated and rereplicate them.
+     *
+     * @param ledgerId ledger id
+     * @param missingReplicas missing replicas
+     * @return a future presents the mark result.
+     */
+    CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas);
 
     /**
      * Mark a ledger as fully replicated. If the ledger is not
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 00215f08a1..32723706fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -27,18 +27,21 @@
 
 import java.net.UnknownHostException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.DNS;
@@ -54,6 +57,7 @@
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -254,58 +258,95 @@ public UnderreplicatedLedgerFormat getLedgerUnreplicationInfo(long ledgerId)
     }
 
     @Override
-    public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
-            throws ReplicationException.UnavailableException {
+    public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica);
-        }
-        try {
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            String znode = getUrLedgerZnode(ledgerId);
-            while (true) {
-                UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+            LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
+        }
+        final List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        final String znode = getUrLedgerZnode(ledgerId);
+        final CompletableFuture<Void> createFuture = new CompletableFuture<>();
+        tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
+        return createFuture;
+    }
+
+    private void tryMarkLedgerUnderreplicatedAsync(final String znode,
+                                                   final Collection<String> missingReplicas,
+                                                   final List<ACL> zkAcls,
+                                                   final CompletableFuture<Void> finalFuture) {
+        final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+        if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
+            builder.setCtime(System.currentTimeMillis());
+        }
+        missingReplicas.forEach(builder::addReplica);
+        final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+        ZkUtils.asyncCreateFullPathOptimistic(
+            zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
+            (rc, path, ctx, name) -> {
+                if (Code.OK.intValue() == rc) {
+                    FutureUtils.complete(finalFuture, null);
+                } else if (Code.NODEEXISTS.intValue() == rc) {
+                    // we need to handle the case where the ledger has been marked as underreplicated
+                    handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+                } else {
+                    FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
+                }
+            }, null);
+    }
+
+
+    private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
+                                                          final Collection<String> missingReplicas,
+                                                          final List<ACL> zkAcls,
+                                                          final CompletableFuture<Void> finalFuture) {
+        // get the existing underreplicated ledger data
+        zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
+            if (Code.OK.intValue() == getRc) {
+                // deserialize existing underreplicated ledger data
+                final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
                 try {
-                    if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
-                        builder.setCtime(System.currentTimeMillis());
-                    }
-                    builder.addReplica(missingReplica);
-                    ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
-                            .printToString(builder.build()).getBytes(UTF_8),
-                            zkAcls, CreateMode.PERSISTENT);
-                } catch (KeeperException.NodeExistsException nee) {
-                    Stat s = zkc.exists(znode, false);
-                    if (s == null) {
+                    TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
+                } catch (ParseException e) {
+                    // corrupted metadata in zookeeper
+                    FutureUtils.completeExceptionally(finalFuture,
+                        new ReplicationException.UnavailableException(
+                            "Invalid underreplicated ledger data for ledger " + znode, e));
+                    return;
+                }
+                UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
+                boolean replicaAdded = false;
+                for (String missingReplica : missingReplicas) {
+                    if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
                         continue;
-                    }
-                    try {
-                        byte[] bytes = zkc.getData(znode, false, s);
-                        builder.clear();
-                        TextFormat.merge(new String(bytes, UTF_8), builder);
-                        UnderreplicatedLedgerFormat data = builder.build();
-                        if (data.getReplicaList().contains(missingReplica)) {
-                            return; // nothing to add
-                        }
+                    } else {
                         builder.addReplica(missingReplica);
-                        zkc.setData(znode,
-                                    TextFormat.printToString(builder.build()).getBytes(UTF_8),
-                                    s.getVersion());
-                    } catch (KeeperException.NoNodeException nne) {
-                        continue;
-                    } catch (KeeperException.BadVersionException bve) {
-                        continue;
-                    } catch (TextFormat.ParseException pe) {
-                        throw new ReplicationException.UnavailableException(
-                                "Invalid data found", pe);
+                        replicaAdded = true;
                     }
                 }
-                break;
+                if (!replicaAdded) { // no new missing replica is added
+                    FutureUtils.complete(finalFuture, null);
+                    return;
+                }
+                if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
+                    builder.setCtime(System.currentTimeMillis());
+                }
+                final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+                zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
+                    if (Code.OK.intValue() == setRc) {
+                        FutureUtils.complete(finalFuture, null);
+                    } else if (Code.NONODE.intValue() == setRc) {
+                        tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+                    } else if (Code.BADVERSION.intValue() == setRc) {
+                        handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+                    } else {
+                        FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
+                    }
+                }, null);
+            } else if (Code.NONODE.intValue() == getRc) {
+                tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+            } else {
+                FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
             }
-        } catch (KeeperException ke) {
-            throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
-        }
+        }, null);
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 920ac57c7c..788aaa2764 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -22,6 +22,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.SettableFuture;
 import java.io.IOException;
@@ -30,20 +31,21 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
@@ -366,8 +368,6 @@ public void run() {
                             } catch (InterruptedException ie) {
                                 Thread.currentThread().interrupt();
                                 LOG.error("Interrupted while running periodic check", ie);
-                            } catch (BKAuditException bkae) {
-                                LOG.error("Exception while running periodic check", bkae);
                             } catch (BKException bke) {
                                 LOG.error("Exception running periodic check", bke);
                             } catch (IOException ioe) {
@@ -467,8 +467,6 @@ private void startAudit(boolean shutDownTask) {
             LOG.error("Interrupted while watching available bookies ", ie);
         } catch (BKAuditException bke) {
             LOG.error("Exception while watching available bookies", bke);
-        } catch (KeeperException ke) {
-            LOG.error("Exception reading bookie list", ke);
         }
         if (shutDownTask) {
             submitShutdownTask();
@@ -477,8 +475,7 @@ private void startAudit(boolean shutDownTask) {
 
     @SuppressWarnings("unchecked")
     private void auditBookies()
-            throws BKAuditException, KeeperException,
-            InterruptedException, BKException {
+            throws BKAuditException, InterruptedException, BKException {
         try {
             waitIfLedgerReplicationDisabled();
         } catch (UnavailableException ue) {
@@ -512,7 +509,12 @@ private void auditBookies()
         bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
                 TimeUnit.MILLISECONDS);
         if (lostBookies.size() > 0) {
-            handleLostBookies(lostBookies, ledgerDetails);
+            try {
+                FutureUtils.result(
+                    handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
+            } catch (ReplicationException e) {
+                throw new BKAuditException(e.getMessage(), e.getCause());
+            }
             uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
                     TimeUnit.MILLISECONDS);
         }
@@ -524,38 +526,33 @@ private void auditBookies()
         return bookieLedgerIndexer.getBookieToLedgerIndex();
     }
 
-    private void handleLostBookies(Collection<String> lostBookies,
-            Map<String, Set<Long>> ledgerDetails) throws BKAuditException {
-        LOG.info("Following are the failed bookies: " + lostBookies
-                + " and searching its ledgers for re-replication");
-
-        for (String bookieIP : lostBookies) {
-            // identify all the ledgers in bookieIP and publishing these ledgers
-            // as under-replicated.
-            publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP));
-        }
+    private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies,
+                                                        Map<String, Set<Long>> ledgerDetails) {
+        LOG.info("Following are the failed bookies: {},"
+                + " and searching its ledgers for re-replication", lostBookies);
+
+        return FutureUtils.processList(
+            Lists.newArrayList(lostBookies),
+            bookieIP -> publishSuspectedLedgersAsync(
+                Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),
+            null
+        );
     }
 
-    private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers)
-            throws BKAuditException {
+    private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
         if (null == ledgers || ledgers.size() == 0) {
             // there is no ledgers available for this bookie and just
             // ignoring the bookie failures
-            LOG.info("There is no ledgers for the failed bookie: {}", bookieIP);
-            return;
+            LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
+            return FutureUtils.Void();
         }
-        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, bookieIP);
+        LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
         numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
-        for (Long ledgerId : ledgers) {
-            try {
-                ledgerUnderreplicationManager.markLedgerUnderreplicated(
-                        ledgerId, bookieIP);
-            } catch (UnavailableException ue) {
-                throw new BKAuditException(
-                        "Failed to publish underreplicated ledger: " + ledgerId
-                                + " of bookie: " + bookieIP, ue);
-            }
-        }
+        return FutureUtils.processList(
+            Lists.newArrayList(ledgers),
+            ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
+            null
+        );
     }
 
     /**
@@ -571,36 +568,31 @@ private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers)
         }
 
         public void operationComplete(int rc, Set<LedgerFragment> fragments) {
-            try {
-                if (rc == BKException.Code.OK) {
-                    Set<BookieSocketAddress> bookies = Sets.newHashSet();
-                    for (LedgerFragment f : fragments) {
-                        bookies.addAll(f.getAddresses());
-                    }
-                    for (BookieSocketAddress bookie : bookies) {
-                        publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
-                    }
-                }
-                lh.close();
-            } catch (BKException bke) {
-                LOG.error("Error closing lh", bke);
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.ReplicationException;
-                }
-            } catch (InterruptedException ie) {
-                LOG.error("Interrupted publishing suspected ledger", ie);
-                Thread.currentThread().interrupt();
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.InterruptedException;
-                }
-            } catch (BKAuditException bkae) {
-                LOG.error("Auditor exception publishing suspected ledger", bkae);
-                if (rc == BKException.Code.OK) {
-                    rc = BKException.Code.ReplicationException;
+            if (rc == BKException.Code.OK) {
+                Set<BookieSocketAddress> bookies = Sets.newHashSet();
+                for (LedgerFragment f : fragments) {
+                    bookies.addAll(f.getAddresses());
                 }
+                publishSuspectedLedgersAsync(
+                    bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()),
+                    Sets.newHashSet(lh.getId())
+                ).whenComplete((result, cause) -> {
+                    if (null != cause) {
+                        LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
+                            lh.getId(), bookies, cause);
+                        callback.processResult(Code.ReplicationException, null, null);
+                    } else {
+                        callback.processResult(Code.OK, null, null);
+                    }
+                });
+            } else {
+                callback.processResult(rc, null, null);
             }
-
-            callback.processResult(rc, null, null);
+            lh.closeAsync().whenComplete((result, cause) -> {
+                if (null != cause) {
+                    LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage());
+                }
+            });
         }
     }
 
@@ -608,8 +600,7 @@ public void operationComplete(int rc, Set<LedgerFragment> fragments) {
      * List all the ledgers and check them individually. This should not
      * be run very often.
      */
-    void checkAllLedgers() throws BKAuditException, BKException,
-            IOException, InterruptedException, KeeperException {
+    void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
         ZooKeeper newzk = ZooKeeperClient.newBuilder()
                 .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
                 .sessionTimeoutMs(conf.getZkTimeout())
@@ -622,91 +613,54 @@ void checkAllLedgers() throws BKAuditException, BKException,
         try {
             final LedgerChecker checker = new LedgerChecker(client);
 
-            final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
-            final CountDownLatch processDone = new CountDownLatch(1);
+            final CompletableFuture<Void> processFuture = new CompletableFuture<>();
 
-            Processor<Long> checkLedgersProcessor = new Processor<Long>() {
-                @Override
-                public void process(final Long ledgerId,
-                                    final AsyncCallback.VoidCallback callback) {
-                    try {
-                        if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
-                            LOG.info("Ledger rereplication has been disabled, aborting periodic check");
-                            processDone.countDown();
-                            return;
-                        }
-                    } catch (ReplicationException.UnavailableException ue) {
-                        LOG.error("Underreplication manager unavailable running periodic check", ue);
-                        processDone.countDown();
+            Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
+                try {
+                    if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+                        LOG.info("Ledger rereplication has been disabled, aborting periodic check");
+                        FutureUtils.complete(processFuture, null);
                         return;
                     }
+                } catch (UnavailableException ue) {
+                    LOG.error("Underreplication manager unavailable running periodic check", ue);
+                    FutureUtils.complete(processFuture, null);
+                    return;
+                }
 
-                    // Do not perform blocking calls that involve making ZK calls from within the ZK
-                    // event thread. Jump to background thread instead to avoid deadlock.
-                    ForkJoinPool.commonPool().execute(() -> {
-                        LedgerHandle lh = null;
-                        try {
-                            lh = admin.openLedgerNoRecovery(ledgerId);
-                            checker.checkLedger(lh,
-                                    new ProcessLostFragmentsCb(lh, callback),
-                                    conf.getAuditorLedgerVerificationPercentage());
-                            // we collect the following stats to get a measure of the
-                            // distribution of a single ledger within the bk cluster
-                            // the higher the number of fragments/bookies, the more distributed it is
-                            numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
-                            numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
-                            numLedgersChecked.inc();
-                        } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Ledger was deleted before we could check it", bknsle);
-                            }
-                            callback.processResult(BKException.Code.OK,
-                                    null, null);
-                            return;
-                        } catch (BKException bke) {
-                            LOG.error("Couldn't open ledger " + ledgerId, bke);
-                            callback.processResult(BKException.Code.BookieHandleNotAvailableException,
-                                    null, null);
-                            return;
-                        } catch (InterruptedException ie) {
-                            LOG.error("Interrupted opening ledger", ie);
-                            Thread.currentThread().interrupt();
-                            callback.processResult(BKException.Code.InterruptedException, null, null);
-                            return;
-                        } finally {
-                            if (lh != null) {
-                                try {
-                                    lh.close();
-                                } catch (BKException bke) {
-                                    LOG.warn("Couldn't close ledger " + ledgerId, bke);
-                                } catch (InterruptedException ie) {
-                                    LOG.warn("Interrupted closing ledger " + ledgerId, ie);
-                                    Thread.currentThread().interrupt();
-                                }
-                            }
+                admin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
+                    if (Code.OK == rc) {
+                        checker.checkLedger(lh,
+                                // the ledger handle will be closed after checkLedger is done.
+                                new ProcessLostFragmentsCb(lh, callback),
+                                conf.getAuditorLedgerVerificationPercentage());
+                        // we collect the following stats to get a measure of the
+                        // distribution of a single ledger within the bk cluster
+                        // the higher the number of fragments/bookies, the more distributed it is
+                        numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+                        numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+                        numLedgersChecked.inc();
+                    } else if (Code.NoSuchLedgerExistsException == rc) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
                         }
-                    });
-                }
+                        callback.processResult(Code.OK, null, null);
+                    } else {
+                        LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc));
+                        callback.processResult(rc, null, null);
+                    }
+                }, null);
             };
 
             ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
-                    new AsyncCallback.VoidCallback() {
-                        @Override
-                        public void processResult(int rc, String s, Object obj) {
-                            returnCode.set(rc);
-                            processDone.countDown();
-                        }
-                    }, null, BKException.Code.OK, BKException.Code.ReadException);
-            try {
-                processDone.await();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new BKAuditException(
-                        "Exception while checking ledgers", e);
-            }
-            if (returnCode.get() != BKException.Code.OK) {
-                throw BKException.create(returnCode.get());
-            }
+                (rc, path, ctx) -> {
+                    if (Code.OK == rc) {
+                        FutureUtils.complete(processFuture, null);
+                    } else {
+                        FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
+                    }
+                }, null, BKException.Code.OK, BKException.Code.ReadException);
+            FutureUtils.result(processFuture, BKException.HANDLER);
         } finally {
             admin.close();
             client.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 1afee699bf..733f63bde8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -18,10 +18,21 @@
 
 package org.apache.bookkeeper.replication;
 
+import java.util.function.Function;
+
 /**
  * Exceptions for use within the replication service.
  */
 public abstract class ReplicationException extends Exception {
+
+    public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> {
+        if (cause instanceof ReplicationException) {
+            return (ReplicationException) cause;
+        } else {
+            return new UnavailableException(cause.getMessage(), cause);
+        }
+    };
+
     protected ReplicationException(String message, Throwable cause) {
         super(message, cause);
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index 5c7d5e6503..db0e308eb4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -53,7 +53,6 @@
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.DNS;
 import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
-import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -766,8 +765,7 @@ public void run() {
     }
 
     private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
-            throws KeeperException, InterruptedException,
-            CompatibilityException, UnavailableException {
+            throws KeeperException, InterruptedException, ReplicationException {
         Long ledgerA = 0xfeadeefdacL;
         String znodeA = getUrLedgerZnode(ledgerA);
         LedgerUnderreplicationManager replicaMgr = lmf1


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services