You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/08/27 03:49:37 UTC
[bookkeeper] branch branch-4.7 updated: Provide async version of
markLedgerUnderreplicated for LedgerUnderreplicationManager
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push:
new 3e01125 Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager
3e01125 is described below
commit 3e0112505381b3eb922f21de1f18daf302002201
Author: Sijie Guo <si...@apache.org>
AuthorDate: Sun Aug 26 19:51:08 2018 -0700
Provide async version of markLedgerUnderreplicated for LedgerUnderreplicationManager
Descriptions of the changes in this PR:
### Motivation
Auditor has multiple places calling sync methods in async callbacks.
This raises the possibility hitting deadlock. Issue #1578 is one of the examples.
After looking into the `LedgerUnderreplicationManager`, `markLedgerUnderreplicated`
is the only interface that will be called in async callbacks. This change is
to provide an async version of `markLedgerUnderreplicated`.
### Changes
- add `markLedgerUnderreplicatedAsync` interface in `LedgerUnderreplicationManager`.
- implement the logic of `markLedgerUnderreplicated` using async callbacks
- use `markLedgerUnderreplicatedAsync` in the Auditor
Related Issues: #1578
Master Issue: #1617
Author: Sijie Guo <si...@apache.org>
Reviewers: Charan Reddy Guttapalem <re...@gmail.com>, Enrico Olivelli <eo...@gmail.com>, Matteo Merli <mm...@apache.org>
This closes #1619 from sijie/async_sync_autorecovery
---
.../org/apache/bookkeeper/client/BKException.java | 15 ++
.../meta/LedgerUnderreplicationManager.java | 22 +-
.../meta/ZkLedgerUnderreplicationManager.java | 123 +++++++----
.../org/apache/bookkeeper/replication/Auditor.java | 238 +++++++++------------
.../replication/ReplicationException.java | 11 +
.../TestLedgerUnderreplicationManager.java | 4 +-
6 files changed, 224 insertions(+), 189 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
index 1a15ebf..ddfc795 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BKException.java
@@ -20,6 +20,8 @@
*/
package org.apache.bookkeeper.client;
+import java.util.function.Function;
+
/**
* Class the enumerates all the possible error conditions.
*
@@ -28,6 +30,19 @@ package org.apache.bookkeeper.client;
@SuppressWarnings("serial")
public abstract class BKException extends org.apache.bookkeeper.client.api.BKException {
+ public static final Function<Throwable, BKException> HANDLER = cause -> {
+ if (cause == null) {
+ return null;
+ }
+ if (cause instanceof BKException) {
+ return (BKException) cause;
+ } else {
+ BKException ex = new BKUnexpectedConditionException();
+ ex.initCause(cause);
+ return ex;
+ }
+ };
+
BKException(int code) {
super(code);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
index 21c6a4f..5bad3cc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerUnderreplicationManager.java
@@ -17,11 +17,15 @@
*/
package org.apache.bookkeeper.meta;
+import com.google.common.collect.Lists;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.replication.ReplicationException;
@@ -29,12 +33,26 @@ import org.apache.bookkeeper.replication.ReplicationException;
* Interface for marking ledgers which need to be rereplicated.
*/
public interface LedgerUnderreplicationManager extends AutoCloseable {
+
/**
* Mark a ledger as underreplicated. The replication should
* then check which fragments are underreplicated and rereplicate them
*/
- void markLedgerUnderreplicated(long ledgerId, String missingReplica)
- throws ReplicationException.UnavailableException;
+ default void markLedgerUnderreplicated(long ledgerId, String missingReplica) throws ReplicationException {
+ FutureUtils.result(
+ markLedgerUnderreplicatedAsync(
+ ledgerId, Lists.newArrayList(missingReplica)), ReplicationException.EXCEPTION_HANDLER);
+ }
+
+ /**
+ * Mark a ledger as underreplicated with missing bookies. The replication should then
+ * check which fragements are underreplicated and rereplicate them.
+ *
+ * @param ledgerId ledger id
+ * @param missingReplicas missing replicas
+ * @return a future presents the mark result.
+ */
+ CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas);
/**
* Mark a ledger as fully replicated. If the ledger is not
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
index 4289bcd..db09df9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerUnderreplicationManager.java
@@ -24,21 +24,25 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.protobuf.TextFormat;
+import com.google.protobuf.TextFormat.ParseException;
import java.net.UnknownHostException;
import java.util.AbstractMap;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.DNS;
@@ -54,6 +58,7 @@ import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -254,55 +259,89 @@ public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationMa
}
@Override
- public void markLedgerUnderreplicated(long ledgerId, String missingReplica)
- throws ReplicationException.UnavailableException {
+ public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
if (LOG.isDebugEnabled()) {
- LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplica);
- }
- try {
- List<ACL> zkAcls = ZkUtils.getACLs(conf);
- String znode = getUrLedgerZnode(ledgerId);
- while (true) {
- UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+ LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
+ }
+ final List<ACL> zkAcls = ZkUtils.getACLs(conf);
+ final String znode = getUrLedgerZnode(ledgerId);
+ final CompletableFuture<Void> createFuture = new CompletableFuture<>();
+ tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
+ return createFuture;
+ }
+
+ private void tryMarkLedgerUnderreplicatedAsync(final String znode,
+ final Collection<String> missingReplicas,
+ final List<ACL> zkAcls,
+ final CompletableFuture<Void> finalFuture) {
+ final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
+ missingReplicas.forEach(builder::addReplica);
+ final byte[] urLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+ ZkUtils.asyncCreateFullPathOptimistic(
+ zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
+ (rc, path, ctx, name) -> {
+ if (Code.OK.intValue() == rc) {
+ FutureUtils.complete(finalFuture, null);
+ } else if (Code.NODEEXISTS.intValue() == rc) {
+ // we need to handle the case where the ledger has been marked as underreplicated
+ handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+ } else {
+ FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
+ }
+ }, null);
+ }
+
+
+ private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
+ final Collection<String> missingReplicas,
+ final List<ACL> zkAcls,
+ final CompletableFuture<Void> finalFuture) {
+ // get the existing underreplicated ledger data
+ zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
+ if (Code.OK.intValue() == getRc) {
+ // deserialize existing underreplicated ledger data
+ final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
- builder.addReplica(missingReplica);
- ZkUtils.createFullPathOptimistic(zkc, znode, TextFormat
- .printToString(builder.build()).getBytes(UTF_8),
- zkAcls, CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- Stat s = zkc.exists(znode, false);
- if (s == null) {
+ TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
+ } catch (ParseException e) {
+ // corrupted metadata in zookeeper
+ FutureUtils.completeExceptionally(finalFuture,
+ new ReplicationException.UnavailableException(
+ "Invalid underreplicated ledger data for ledger " + znode, e));
+ return;
+ }
+ UnderreplicatedLedgerFormat existingUrLedgerFormat = builder.build();
+ boolean replicaAdded = false;
+ for (String missingReplica : missingReplicas) {
+ if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
continue;
- }
- try {
- byte[] bytes = zkc.getData(znode, false, s);
- builder.clear();
- TextFormat.merge(new String(bytes, UTF_8), builder);
- UnderreplicatedLedgerFormat data = builder.build();
- if (data.getReplicaList().contains(missingReplica)) {
- return; // nothing to add
- }
+ } else {
builder.addReplica(missingReplica);
- zkc.setData(znode,
- TextFormat.printToString(builder.build()).getBytes(UTF_8),
- s.getVersion());
- } catch (KeeperException.NoNodeException nne) {
- continue;
- } catch (KeeperException.BadVersionException bve) {
- continue;
- } catch (TextFormat.ParseException pe) {
- throw new ReplicationException.UnavailableException(
- "Invalid data found", pe);
+ replicaAdded = true;
}
}
- break;
+ if (!replicaAdded) { // no new missing replica is added
+ FutureUtils.complete(finalFuture, null);
+ return;
+ }
+ final byte[] newUrLedgerData = TextFormat.printToString(builder.build()).getBytes(UTF_8);
+ zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
+ if (Code.OK.intValue() == setRc) {
+ FutureUtils.complete(finalFuture, null);
+ } else if (Code.NONODE.intValue() == setRc) {
+ tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+ } else if (Code.BADVERSION.intValue() == setRc) {
+ handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
+ } else {
+ FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
+ }
+ }, null);
+ } else if (Code.NONODE.intValue() == getRc) {
+ tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
+ } else {
+ FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
}
- } catch (KeeperException ke) {
- throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
- }
+ }, null);
}
@Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 8578a5b..f937632 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.replication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
@@ -30,20 +31,21 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.client.LedgerChecker;
import org.apache.bookkeeper.client.LedgerFragment;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
@@ -366,8 +368,6 @@ public class Auditor {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while running periodic check", ie);
- } catch (BKAuditException bkae) {
- LOG.error("Exception while running periodic check", bkae);
} catch (BKException bke) {
LOG.error("Exception running periodic check", bke);
} catch (IOException ioe) {
@@ -467,8 +467,6 @@ public class Auditor {
LOG.error("Interrupted while watching available bookies ", ie);
} catch (BKAuditException bke) {
LOG.error("Exception while watching available bookies", bke);
- } catch (KeeperException ke) {
- LOG.error("Exception reading bookie list", ke);
}
if (shutDownTask) {
submitShutdownTask();
@@ -477,8 +475,7 @@ public class Auditor {
@SuppressWarnings("unchecked")
private void auditBookies()
- throws BKAuditException, KeeperException,
- InterruptedException, BKException {
+ throws BKAuditException, InterruptedException, BKException {
try {
waitIfLedgerReplicationDisabled();
} catch (UnavailableException ue) {
@@ -512,7 +509,12 @@ public class Auditor {
bookieToLedgersMapCreationTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
if (lostBookies.size() > 0) {
- handleLostBookies(lostBookies, ledgerDetails);
+ try {
+ FutureUtils.result(
+ handleLostBookiesAsync(lostBookies, ledgerDetails), ReplicationException.EXCEPTION_HANDLER);
+ } catch (ReplicationException e) {
+ throw new BKAuditException(e.getMessage(), e.getCause());
+ }
uRLPublishTimeForLostBookies.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
}
@@ -524,38 +526,33 @@ public class Auditor {
return bookieLedgerIndexer.getBookieToLedgerIndex();
}
- private void handleLostBookies(Collection<String> lostBookies,
- Map<String, Set<Long>> ledgerDetails) throws BKAuditException {
- LOG.info("Following are the failed bookies: " + lostBookies
- + " and searching its ledgers for re-replication");
-
- for (String bookieIP : lostBookies) {
- // identify all the ledgers in bookieIP and publishing these ledgers
- // as under-replicated.
- publishSuspectedLedgers(bookieIP, ledgerDetails.get(bookieIP));
- }
+ private CompletableFuture<?> handleLostBookiesAsync(Collection<String> lostBookies,
+ Map<String, Set<Long>> ledgerDetails) {
+ LOG.info("Following are the failed bookies: {},"
+ + " and searching its ledgers for re-replication", lostBookies);
+
+ return FutureUtils.processList(
+ Lists.newArrayList(lostBookies),
+ bookieIP -> publishSuspectedLedgersAsync(
+ Lists.newArrayList(bookieIP), ledgerDetails.get(bookieIP)),
+ null
+ );
}
- private void publishSuspectedLedgers(String bookieIP, Set<Long> ledgers)
- throws BKAuditException {
+ private CompletableFuture<?> publishSuspectedLedgersAsync(Collection<String> missingBookies, Set<Long> ledgers) {
if (null == ledgers || ledgers.size() == 0) {
// there is no ledgers available for this bookie and just
// ignoring the bookie failures
- LOG.info("There is no ledgers for the failed bookie: {}", bookieIP);
- return;
+ LOG.info("There is no ledgers for the failed bookie: {}", missingBookies);
+ return FutureUtils.Void();
}
- LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, bookieIP);
+ LOG.info("Following ledgers: {} of bookie: {} are identified as underreplicated", ledgers, missingBookies);
numUnderReplicatedLedger.registerSuccessfulValue(ledgers.size());
- for (Long ledgerId : ledgers) {
- try {
- ledgerUnderreplicationManager.markLedgerUnderreplicated(
- ledgerId, bookieIP);
- } catch (UnavailableException ue) {
- throw new BKAuditException(
- "Failed to publish underreplicated ledger: " + ledgerId
- + " of bookie: " + bookieIP, ue);
- }
- }
+ return FutureUtils.processList(
+ Lists.newArrayList(ledgers),
+ ledgerId -> ledgerUnderreplicationManager.markLedgerUnderreplicatedAsync(ledgerId, missingBookies),
+ null
+ );
}
/**
@@ -571,36 +568,31 @@ public class Auditor {
}
public void operationComplete(int rc, Set<LedgerFragment> fragments) {
- try {
- if (rc == BKException.Code.OK) {
- Set<BookieSocketAddress> bookies = Sets.newHashSet();
- for (LedgerFragment f : fragments) {
- bookies.addAll(f.getAddresses());
- }
- for (BookieSocketAddress bookie : bookies) {
- publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
- }
- }
- lh.close();
- } catch (BKException bke) {
- LOG.error("Error closing lh", bke);
- if (rc == BKException.Code.OK) {
- rc = BKException.Code.ReplicationException;
- }
- } catch (InterruptedException ie) {
- LOG.error("Interrupted publishing suspected ledger", ie);
- Thread.currentThread().interrupt();
- if (rc == BKException.Code.OK) {
- rc = BKException.Code.InterruptedException;
- }
- } catch (BKAuditException bkae) {
- LOG.error("Auditor exception publishing suspected ledger", bkae);
- if (rc == BKException.Code.OK) {
- rc = BKException.Code.ReplicationException;
+ if (rc == BKException.Code.OK) {
+ Set<BookieSocketAddress> bookies = Sets.newHashSet();
+ for (LedgerFragment f : fragments) {
+ bookies.addAll(f.getAddresses());
}
+ publishSuspectedLedgersAsync(
+ bookies.stream().map(BookieSocketAddress::toString).collect(Collectors.toList()),
+ Sets.newHashSet(lh.getId())
+ ).whenComplete((result, cause) -> {
+ if (null != cause) {
+ LOG.error("Auditor exception publishing suspected ledger {} with lost bookies {}",
+ lh.getId(), bookies, cause);
+ callback.processResult(Code.ReplicationException, null, null);
+ } else {
+ callback.processResult(Code.OK, null, null);
+ }
+ });
+ } else {
+ callback.processResult(rc, null, null);
}
-
- callback.processResult(rc, null, null);
+ lh.closeAsync().whenComplete((result, cause) -> {
+ if (null != cause) {
+ LOG.warn("Error closing ledger {} : {}", lh.getId(), cause.getMessage());
+ }
+ });
}
}
@@ -608,8 +600,7 @@ public class Auditor {
* List all the ledgers and check them individually. This should not
* be run very often.
*/
- void checkAllLedgers() throws BKAuditException, BKException,
- IOException, InterruptedException, KeeperException {
+ void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
ZooKeeper newzk = ZooKeeperClient.newBuilder()
.connectString(ZKMetadataDriverBase.resolveZkServers(conf))
.sessionTimeoutMs(conf.getZkTimeout())
@@ -622,91 +613,54 @@ public class Auditor {
try {
final LedgerChecker checker = new LedgerChecker(client);
- final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
- final CountDownLatch processDone = new CountDownLatch(1);
+ final CompletableFuture<Void> processFuture = new CompletableFuture<>();
- Processor<Long> checkLedgersProcessor = new Processor<Long>() {
- @Override
- public void process(final Long ledgerId,
- final AsyncCallback.VoidCallback callback) {
- try {
- if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
- LOG.info("Ledger rereplication has been disabled, aborting periodic check");
- processDone.countDown();
- return;
- }
- } catch (ReplicationException.UnavailableException ue) {
- LOG.error("Underreplication manager unavailable running periodic check", ue);
- processDone.countDown();
+ Processor<Long> checkLedgersProcessor = (ledgerId, callback) -> {
+ try {
+ if (!ledgerUnderreplicationManager.isLedgerReplicationEnabled()) {
+ LOG.info("Ledger rereplication has been disabled, aborting periodic check");
+ FutureUtils.complete(processFuture, null);
return;
}
+ } catch (UnavailableException ue) {
+ LOG.error("Underreplication manager unavailable running periodic check", ue);
+ FutureUtils.complete(processFuture, null);
+ return;
+ }
- // Do not perform blocking calls that involve making ZK calls from within the ZK
- // event thread. Jump to background thread instead to avoid deadlock.
- ForkJoinPool.commonPool().execute(() -> {
- LedgerHandle lh = null;
- try {
- lh = admin.openLedgerNoRecovery(ledgerId);
- checker.checkLedger(lh,
- new ProcessLostFragmentsCb(lh, callback),
- conf.getAuditorLedgerVerificationPercentage());
- // we collect the following stats to get a measure of the
- // distribution of a single ledger within the bk cluster
- // the higher the number of fragments/bookies, the more distributed it is
- numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
- numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
- numLedgersChecked.inc();
- } catch (BKException.BKNoSuchLedgerExistsException bknsle) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ledger was deleted before we could check it", bknsle);
- }
- callback.processResult(BKException.Code.OK,
- null, null);
- return;
- } catch (BKException bke) {
- LOG.error("Couldn't open ledger " + ledgerId, bke);
- callback.processResult(BKException.Code.BookieHandleNotAvailableException,
- null, null);
- return;
- } catch (InterruptedException ie) {
- LOG.error("Interrupted opening ledger", ie);
- Thread.currentThread().interrupt();
- callback.processResult(BKException.Code.InterruptedException, null, null);
- return;
- } finally {
- if (lh != null) {
- try {
- lh.close();
- } catch (BKException bke) {
- LOG.warn("Couldn't close ledger " + ledgerId, bke);
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted closing ledger " + ledgerId, ie);
- Thread.currentThread().interrupt();
- }
- }
+ admin.asyncOpenLedgerNoRecovery(ledgerId, (rc, lh, ctx) -> {
+ if (Code.OK == rc) {
+ checker.checkLedger(lh,
+ // the ledger handle will be closed after checkLedger is done.
+ new ProcessLostFragmentsCb(lh, callback),
+ conf.getAuditorLedgerVerificationPercentage());
+ // we collect the following stats to get a measure of the
+ // distribution of a single ledger within the bk cluster
+ // the higher the number of fragments/bookies, the more distributed it is
+ numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments());
+ numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies());
+ numLedgersChecked.inc();
+ } else if (Code.NoSuchLedgerExistsException == rc) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ledger {} was deleted before we could check it", ledgerId);
}
- });
- }
+ callback.processResult(Code.OK, null, null);
+ } else {
+ LOG.error("Couldn't open ledger {} to check : {}", ledgerId, BKException.getMessage(rc));
+ callback.processResult(rc, null, null);
+ }
+ }, null);
};
ledgerManager.asyncProcessLedgers(checkLedgersProcessor,
- new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String s, Object obj) {
- returnCode.set(rc);
- processDone.countDown();
- }
- }, null, BKException.Code.OK, BKException.Code.ReadException);
- try {
- processDone.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new BKAuditException(
- "Exception while checking ledgers", e);
- }
- if (returnCode.get() != BKException.Code.OK) {
- throw BKException.create(returnCode.get());
- }
+ (rc, path, ctx) -> {
+ if (Code.OK == rc) {
+ FutureUtils.complete(processFuture, null);
+ } else {
+ FutureUtils.completeExceptionally(processFuture, BKException.create(rc));
+ }
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+ FutureUtils.result(processFuture, BKException.HANDLER);
} finally {
admin.close();
client.close();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
index 1afee69..733f63b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationException.java
@@ -18,10 +18,21 @@
package org.apache.bookkeeper.replication;
+import java.util.function.Function;
+
/**
* Exceptions for use within the replication service.
*/
public abstract class ReplicationException extends Exception {
+
+ public static final Function<Throwable, ReplicationException> EXCEPTION_HANDLER = cause -> {
+ if (cause instanceof ReplicationException) {
+ return (ReplicationException) cause;
+ } else {
+ return new UnavailableException(cause.getMessage(), cause);
+ }
+ };
+
protected ReplicationException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
index 229af16..214777e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java
@@ -52,7 +52,6 @@ import org.apache.bookkeeper.meta.ZkLayoutManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
-import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.BookKeeperConstants;
@@ -724,8 +723,7 @@ public class TestLedgerUnderreplicationManager {
}
private void verifyMarkLedgerUnderreplicated(Collection<String> missingReplica)
- throws KeeperException, InterruptedException,
- CompatibilityException, UnavailableException {
+ throws KeeperException, InterruptedException, ReplicationException {
Long ledgerA = 0xfeadeefdacL;
String znodeA = getUrLedgerZnode(ledgerA);
LedgerUnderreplicationManager replicaMgr = lmf1