You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2017/10/27 23:09:57 UTC
[bookkeeper] branch master updated: ISSUE #596 ISSUE #583: Auto
replication should honor ensemble placement policy
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6fcabfc ISSUE #596 ISSUE #583: Auto replication should honor ensemble placement policy
6fcabfc is described below
commit 6fcabfc80c5bd6cdfa9252cdfc4ed209cc0620a6
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Oct 27 16:09:48 2017 -0700
ISSUE #596 ISSUE #583: Auto replication should honor ensemble placement policy
Descriptions of the changes in this PR:
This pull request ports the changes from [twitter/bookkeeperfc7e171](https://github.com/twitter/bookkeeper/commit/fc7e171135a58cddb9e91f5f614b3ceb6f9f9fee)
The changes include:
1. when bookkeeper admin re-replicates a ledger, it will pick a bookie from the available bookies in the cluster to satisfy the placement constraint. (#596)
2. hence, remove `targetBookie` from ReplicationWorker, because the parameter will volatile the placement constraint. (#583)
3. at the same time, change `LedgerFragement` to represent the number of bookies that need to be check and replicate. a) the ledger checker can use the correct bookie index for verifying the existence of entries in a bookie (for stripping case) b) only read entries one time when need to replicate them to multiple bookies.
Author: Sijie Guo <si...@apache.org>
Reviewers: Ivan Kelly <iv...@apache.org>
This closes #641 from sijie/twitter_autorecovery_fixes, closes #596, closes #583
---
bookkeeper-server/conf/log4j.shell.properties | 7 +
.../apache/bookkeeper/client/BookKeeperAdmin.java | 486 +++++++++++++--------
.../apache/bookkeeper/client/LedgerChecker.java | 167 +++++--
.../apache/bookkeeper/client/LedgerFragment.java | 106 ++++-
.../client/LedgerFragmentReplicator.java | 173 ++++----
.../bookkeeper/conf/ServerConfiguration.java | 21 +
.../org/apache/bookkeeper/replication/Auditor.java | 12 +-
.../bookkeeper/replication/AutoRecoveryMain.java | 9 +-
.../bookkeeper/replication/ReplicationStats.java | 1 +
.../bookkeeper/replication/ReplicationWorker.java | 168 +++----
.../bookkeeper/client/BookKeeperCloseTest.java | 21 +-
.../bookkeeper/client/BookieRecoveryTest.java | 9 +-
.../bookkeeper/client/TestLedgerChecker.java | 55 ++-
.../client/TestLedgerFragmentReplication.java | 15 +-
.../replication/BookieAutoRecoveryTest.java | 4 +-
.../replication/TestReplicationWorker.java | 64 +--
16 files changed, 778 insertions(+), 540 deletions(-)
diff --git a/bookkeeper-server/conf/log4j.shell.properties b/bookkeeper-server/conf/log4j.shell.properties
index 58d6ea6..c2f2e0e 100644
--- a/bookkeeper-server/conf/log4j.shell.properties
+++ b/bookkeeper-server/conf/log4j.shell.properties
@@ -36,6 +36,13 @@ log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ABSOLUTE} %-5p %m%n
+# verbose console logging
+log4j.appender.VERBOSECONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.VERBOSECONSOLE.Threshold=INFO
+log4j.appender.VERBOSECONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.VERBOSECONSOLE.layout.ConversionPattern=%m%n
+
+log4j.logger.verbose=INFO,VERBOSECONSOLE
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.bookkeeper=ERROR
log4j.logger.org.apache.bookkeeper.bookie.BookieShell=INFO
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index a0d2fe2..d2e5db3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -23,18 +23,22 @@ package org.apache.bookkeeper.client;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
@@ -70,7 +74,6 @@ import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
@@ -84,7 +87,10 @@ import org.slf4j.LoggerFactory;
* Admin client for BookKeeper clusters
*/
public class BookKeeperAdmin implements AutoCloseable {
+
private final static Logger LOG = LoggerFactory.getLogger(BookKeeperAdmin.class);
+ private final static Logger VERBOSE = LoggerFactory.getLogger("verbose");
+
// ZK client instance
private ZooKeeper zk;
private final boolean ownsZK;
@@ -495,9 +501,20 @@ public class BookKeeperAdmin implements AutoCloseable {
*/
public void recoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest)
throws InterruptedException, BKException {
+ Set<BookieSocketAddress> bookiesSrc = Sets.newHashSet(bookieSrc);
+ recoverBookieData(bookiesSrc);
+ }
+
+ public void recoverBookieData(final Set<BookieSocketAddress> bookiesSrc)
+ throws InterruptedException, BKException {
+ recoverBookieData(bookiesSrc, false, false);
+ }
+
+ public void recoverBookieData(final Set<BookieSocketAddress> bookiesSrc, boolean dryrun, boolean skipOpenLedgers)
+ throws InterruptedException, BKException {
SyncObject sync = new SyncObject();
// Call the async method to recover bookie data.
- asyncRecoverBookieData(bookieSrc, bookieDest, new RecoverCallback() {
+ asyncRecoverBookieData(bookiesSrc, dryrun, skipOpenLedgers, new RecoverCallback() {
@Override
public void recoverComplete(int rc, Object ctx) {
LOG.info("Recover bookie operation completed with rc: " + rc);
@@ -520,7 +537,7 @@ public class BookKeeperAdmin implements AutoCloseable {
throw BKException.create(sync.rc);
}
}
-
+
/**
* Async method to rebuild and recover the ledger fragments data that was
* stored on the source bookie. That bookie could have failed completely and
@@ -535,89 +552,26 @@ public class BookKeeperAdmin implements AutoCloseable {
* @param bookieSrc
* Source bookie that had a failure. We want to replicate the
* ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
* @param cb
* RecoverCallback to invoke once all of the data on the dead
* bookie has been recovered and replicated.
* @param context
* Context for the RecoverCallback to call.
*/
- public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
+ public void asyncRecoverBookieData(final BookieSocketAddress bookieSrc,
final RecoverCallback cb, final Object context) {
- // Sync ZK to make sure we're reading the latest bookie data.
- zk.sync(bookiesPath, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error syncing: ", KeeperException.create(KeeperException.Code.get(rc), path));
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- getAvailableBookies(bookieSrc, bookieDest, cb, context);
- }
+ Set<BookieSocketAddress> bookiesSrc = Sets.newHashSet(bookieSrc);
+ asyncRecoverBookieData(bookiesSrc, cb, context);
+ }
- }, null);
+ public void asyncRecoverBookieData(final Set<BookieSocketAddress> bookieSrc,
+ final RecoverCallback cb, final Object context) {
+ asyncRecoverBookieData(bookieSrc, false, false, cb, context);
}
- /**
- * This method asynchronously gets the set of available Bookies that the
- * dead input bookie's data will be copied over into. If the user passed in
- * a specific destination bookie, then just use that one. Otherwise, we'll
- * randomly pick one of the other available bookies to use for each ledger
- * fragment we are replicating.
- *
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
- * ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
- * @param cb
- * RecoverCallback to invoke once all of the data on the dead
- * bookie has been recovered and replicated.
- * @param context
- * Context for the RecoverCallback to call.
- */
- private void getAvailableBookies(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
- final RecoverCallback cb, final Object context) {
- final List<BookieSocketAddress> availableBookies = new LinkedList<BookieSocketAddress>();
- if (bookieDest != null) {
- availableBookies.add(bookieDest);
- // Now poll ZK to get the active ledgers
- getActiveLedgers(bookieSrc, bookieDest, cb, context, availableBookies);
- } else {
- zk.getChildren(bookiesPath, null, new AsyncCallback.ChildrenCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx, List<String> children) {
- if (rc != Code.OK.intValue()) {
- LOG.error("ZK error getting bookie nodes: ", KeeperException.create(KeeperException.Code
- .get(rc), path));
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- for (String bookieNode : children) {
- if (BookKeeperConstants.READONLY
- .equals(bookieNode)) {
- // exclude the readonly node from available bookies.
- continue;
- }
- BookieSocketAddress addr;
- try {
- addr = new BookieSocketAddress(bookieNode);
- } catch (UnknownHostException nhe) {
- LOG.error("Bookie Node retrieved from ZK has invalid name format: " + bookieNode);
- cb.recoverComplete(BKException.Code.ZKException, context);
- return;
- }
- availableBookies.add(addr);
- }
- // Now poll ZK to get the active ledgers
- getActiveLedgers(bookieSrc, null, cb, context, availableBookies);
- }
- }, null);
- }
+ public void asyncRecoverBookieData(final Set<BookieSocketAddress> bookieSrc, boolean dryrun,
+ final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
+ getActiveLedgers(bookieSrc, dryrun, skipOpenLedgers, cb, context);
}
/**
@@ -626,25 +580,21 @@ public class BookKeeperAdmin implements AutoCloseable {
* determine if any of the ledger fragments for it were stored at the dead
* input bookie.
*
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
+ * @param bookiesSrc
+ * Source bookies that had a failure. We want to replicate the
* ledger fragments that were stored there.
- * @param bookieDest
- * Optional destination bookie that if passed, we will copy all
- * of the ledger fragments from the source bookie over to it.
+ * @param dryrun
+ * dryrun the recover procedure.
+ * @param skipOpenLedgers
+ * Skip recovering open ledgers.
* @param cb
* RecoverCallback to invoke once all of the data on the dead
* bookie has been recovered and replicated.
* @param context
* Context for the RecoverCallback to call.
- * @param availableBookies
- * List of Bookie Servers that are available to use for
- * replicating data on the failed bookie. This could contain a
- * single bookie server if the user explicitly chose a bookie
- * server to replicate data to.
*/
- private void getActiveLedgers(final BookieSocketAddress bookieSrc, final BookieSocketAddress bookieDest,
- final RecoverCallback cb, final Object context, final List<BookieSocketAddress> availableBookies) {
+ private void getActiveLedgers(final Set<BookieSocketAddress> bookiesSrc, final boolean dryrun,
+ final boolean skipOpenLedgers, final RecoverCallback cb, final Object context) {
// Wrapper class around the RecoverCallback so it can be used
// as the final VoidCallback to process ledgers
class RecoverCallbackWrapper implements AsyncCallback.VoidCallback {
@@ -663,7 +613,7 @@ public class BookKeeperAdmin implements AutoCloseable {
Processor<Long> ledgerProcessor = new Processor<Long>() {
@Override
public void process(Long ledgerId, AsyncCallback.VoidCallback iterCallback) {
- recoverLedger(bookieSrc, ledgerId, iterCallback, availableBookies);
+ recoverLedger(bookiesSrc, ledgerId, dryrun, skipOpenLedgers, iterCallback);
}
};
bkc.getLedgerManager().asyncProcessLedgers(
@@ -672,41 +622,24 @@ public class BookKeeperAdmin implements AutoCloseable {
}
/**
- * Get a new random bookie, but ensure that it isn't one that is already
- * in the ensemble for the ledger.
- */
- private BookieSocketAddress getNewBookie(final List<BookieSocketAddress> bookiesAlreadyInEnsemble,
- final List<BookieSocketAddress> availableBookies)
- throws BKException.BKNotEnoughBookiesException {
- ArrayList<BookieSocketAddress> candidates = new ArrayList<BookieSocketAddress>();
- candidates.addAll(availableBookies);
- candidates.removeAll(bookiesAlreadyInEnsemble);
- if (candidates.size() == 0) {
- throw new BKException.BKNotEnoughBookiesException();
- }
- return candidates.get(rand.nextInt(candidates.size()));
- }
-
- /**
* This method asynchronously recovers a given ledger if any of the ledger
* entries were stored on the failed bookie.
*
- * @param bookieSrc
- * Source bookie that had a failure. We want to replicate the
+ * @param bookiesSrc
+ * Source bookies that had a failure. We want to replicate the
* ledger fragments that were stored there.
* @param lId
* Ledger id we want to recover.
- * @param ledgerIterCb
+ * @param dryrun
+ * printing the recovery plan without actually recovering bookies
+ * @param skipOpenLedgers
+ * Skip recovering open ledgers.
+ * @param finalLedgerIterCb
* IterationCallback to invoke once we've recovered the current
* ledger.
- * @param availableBookies
- * List of Bookie Servers that are available to use for
- * replicating data on the failed bookie. This could contain a
- * single bookie server if the user explicitly chose a bookie
- * server to replicate data to.
*/
- private void recoverLedger(final BookieSocketAddress bookieSrc, final long lId,
- final AsyncCallback.VoidCallback ledgerIterCb, final List<BookieSocketAddress> availableBookies) {
+ private void recoverLedger(final Set<BookieSocketAddress> bookiesSrc, final long lId, final boolean dryrun,
+ final boolean skipOpenLedgers, final AsyncCallback.VoidCallback finalLedgerIterCb) {
if (LOG.isDebugEnabled()) {
LOG.debug("Recovering ledger : {}", lId);
}
@@ -714,43 +647,73 @@ public class BookKeeperAdmin implements AutoCloseable {
asyncOpenLedgerNoRecovery(lId, new OpenCallback() {
@Override
public void openComplete(int rc, final LedgerHandle lh, Object ctx) {
- if (rc != Code.OK.intValue()) {
+ if (rc != BKException.Code.OK) {
LOG.error("BK error opening ledger: " + lId, BKException.create(rc));
- ledgerIterCb.processResult(rc, null, null);
+ finalLedgerIterCb.processResult(rc, null, null);
return;
}
LedgerMetadata lm = lh.getLedgerMetadata();
- if (!lm.isClosed() &&
- lm.getEnsembles().size() > 0) {
- Long lastKey = lm.getEnsembles().lastKey();
- ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
- // the original write has not removed faulty bookie from
- // current ledger ensemble. to avoid data loss issue in
- // the case of concurrent updates to the ensemble composition,
- // the recovery tool should first close the ledger
- if (lastEnsemble.contains(bookieSrc)) {
- // close opened non recovery ledger handle
+ if (skipOpenLedgers && !lm.isClosed() && !lm.isInRecovery()) {
+ LOG.info("Skip recovering open ledger {}.", lId);
+ try {
+ lh.close();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BKException bke) {
+ LOG.warn("Error on closing ledger handle for {}.", lId);
+ }
+ finalLedgerIterCb.processResult(BKException.Code.OK, null, null);
+ return;
+ }
+
+ final boolean fenceRequired = !lm.isClosed() && containBookiesInLastEnsemble(lm, bookiesSrc);
+ // the original write has not removed faulty bookie from
+ // current ledger ensemble. to avoid data loss issue in
+ // the case of concurrent updates to the ensemble composition,
+ // the recovery tool should first close the ledger
+ if (!dryrun && fenceRequired) {
+ // close opened non recovery ledger handle
+ try {
+ lh.close();
+ } catch (Exception ie) {
+ LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ }
+ asyncOpenLedger(lId, new OpenCallback() {
+ @Override
+ public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
+ if (newrc != BKException.Code.OK) {
+ LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
+ finalLedgerIterCb.processResult(newrc, null, null);
+ return;
+ }
+ bkc.mainWorkerPool.submit(() -> {
+ // do recovery
+ recoverLedger(bookiesSrc, lId, dryrun, skipOpenLedgers, finalLedgerIterCb);
+ });
+ }
+ }, null);
+ return;
+ }
+
+ final AsyncCallback.VoidCallback ledgerIterCb = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (BKException.Code.OK != rc) {
+ LOG.error("Failed to recover ledger {} : {}", lId, rc);
+ } else {
+ LOG.info("Recovered ledger {}.", lId);
+ }
try {
lh.close();
- } catch (Exception ie) {
- LOG.warn("Error closing non recovery ledger handle for ledger " + lId, ie);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BKException bke) {
+ LOG.warn("Error on cloing ledger handle for {}.", lId);
}
- asyncOpenLedger(lId, new OpenCallback() {
- @Override
- public void openComplete(int newrc, final LedgerHandle newlh, Object newctx) {
- if (newrc != Code.OK.intValue()) {
- LOG.error("BK error close ledger: " + lId, BKException.create(newrc));
- ledgerIterCb.processResult(newrc, null, null);
- return;
- }
- // do recovery
- recoverLedger(bookieSrc, lId, ledgerIterCb, availableBookies);
- }
- }, null);
- return;
+ finalLedgerIterCb.processResult(rc, path, ctx);
}
- }
+ };
/*
* This List stores the ledger fragments to recover indexed by
@@ -773,7 +736,7 @@ public class BookKeeperAdmin implements AutoCloseable {
if (curEntryId != null)
ledgerFragmentsRange.put(curEntryId, entry.getKey() - 1);
curEntryId = entry.getKey();
- if (entry.getValue().contains(bookieSrc)) {
+ if (containBookies(entry.getValue(), bookiesSrc)) {
/*
* Current ledger fragment has entries stored on the
* dead bookie so we'll need to recover them.
@@ -797,6 +760,10 @@ public class BookKeeperAdmin implements AutoCloseable {
return;
}
+ if (dryrun) {
+ VERBOSE.info("Recovered ledger {} : {}", lId, (fenceRequired ? "[fence required]" : ""));
+ }
+
/*
* Multicallback for ledger. Once all fragments for the ledger have been recovered
* trigger the ledgerIterCb
@@ -810,47 +777,69 @@ public class BookKeeperAdmin implements AutoCloseable {
*/
for (final Long startEntryId : ledgerFragmentsToRecover) {
Long endEntryId = ledgerFragmentsRange.get(startEntryId);
- BookieSocketAddress newBookie = null;
+ ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsembles().get(startEntryId);
+ // Get bookies to replace
+ Map<Integer, BookieSocketAddress> targetBookieAddresses;
try {
- newBookie = getNewBookie(lh.getLedgerMetadata().getEnsembles().get(startEntryId),
- availableBookies);
- } catch (BKException.BKNotEnoughBookiesException bke) {
- ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException,
- null, null);
+ targetBookieAddresses = getReplacementBookies(lh, ensemble, bookiesSrc);
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ if (!dryrun) {
+ ledgerFragmentsMcb.processResult(BKException.Code.NotEnoughBookiesException, null, null);
+ } else {
+ VERBOSE.info(" Fragment [{} - {}] : {}", startEntryId, endEntryId,
+ BKException.getMessage(BKException.Code.NotEnoughBookiesException));
+ }
continue;
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Replicating fragment from [" + startEntryId
- + "," + endEntryId + "] of ledger " + lh.getId()
- + " to " + newBookie);
- }
- try {
- LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
- ledgerFragmentsMcb, lh, startEntryId, bookieSrc, newBookie);
- ArrayList<BookieSocketAddress> currentEnsemble = lh.getLedgerMetadata().getEnsemble(
- startEntryId);
- int bookieIndex = -1;
- if (null != currentEnsemble) {
- for (int i = 0; i < currentEnsemble.size(); i++) {
- if (currentEnsemble.get(i).equals(bookieSrc)) {
- bookieIndex = i;
- break;
- }
- }
+ if (dryrun) {
+ ArrayList<BookieSocketAddress> newEnsemble =
+ replaceBookiesInEnsemble(ensemble, targetBookieAddresses);
+ VERBOSE.info(" Fragment [{} - {}] : ", startEntryId, endEntryId);
+ VERBOSE.info(" old ensemble : {}", formatEnsemble(ensemble, bookiesSrc, '*'));
+ VERBOSE.info(" new ensemble : {}", formatEnsemble(newEnsemble, bookiesSrc, '*'));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Replicating fragment from [{}, {}] of ledger {} to {}",
+ startEntryId, endEntryId, lh.getId(), targetBookieAddresses);
+ }
+ try {
+ LedgerFragmentReplicator.SingleFragmentCallback cb = new LedgerFragmentReplicator.SingleFragmentCallback(
+ ledgerFragmentsMcb, lh, startEntryId, getReplacementBookiesMap(ensemble, targetBookieAddresses));
+ LedgerFragment ledgerFragment = new LedgerFragment(lh,
+ startEntryId, endEntryId, targetBookieAddresses.keySet());
+ asyncRecoverLedgerFragment(lh, ledgerFragment, cb, Sets.newHashSet(targetBookieAddresses.values()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
}
- LedgerFragment ledgerFragment = new LedgerFragment(lh,
- startEntryId, endEntryId, bookieIndex);
- asyncRecoverLedgerFragment(lh, ledgerFragment, cb, newBookie);
- } catch(InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
}
}
+ if (dryrun) {
+ ledgerIterCb.processResult(BKException.Code.OK, null, null);
+ }
}
}, null);
}
+ static String formatEnsemble(ArrayList<BookieSocketAddress> ensemble, Set<BookieSocketAddress> bookiesSrc, char marker) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (int i = 0; i < ensemble.size(); i++) {
+ sb.append(ensemble.get(i));
+ if (bookiesSrc.contains(ensemble.get(i))) {
+ sb.append(marker);
+ } else {
+ sb.append(' ');
+ }
+ if (i != ensemble.size() - 1) {
+ sb.append(", ");
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
/**
* This method asynchronously recovers a ledger fragment which is a
* contiguous portion of a ledger that was stored in an ensemble that
@@ -863,16 +852,81 @@ public class BookKeeperAdmin implements AutoCloseable {
* @param ledgerFragmentMcb
* - MultiCallback to invoke once we've recovered the current
* ledger fragment.
- * @param newBookie
- * - New bookie we want to use to recover and replicate the
+ * @param newBookies
+ * - New bookies we want to use to recover and replicate the
* ledger entries that were stored on the failed bookie.
*/
private void asyncRecoverLedgerFragment(final LedgerHandle lh,
final LedgerFragment ledgerFragment,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress newBookie)
- throws InterruptedException {
- lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookie);
+ final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+ lfr.replicate(lh, ledgerFragment, ledgerFragmentMcb, newBookies);
+ }
+
+ private Map<Integer, BookieSocketAddress> getReplacementBookies(
+ LedgerHandle lh,
+ List<BookieSocketAddress> ensemble,
+ Set<BookieSocketAddress> bookiesToRereplicate)
+ throws BKException.BKNotEnoughBookiesException {
+ Set<Integer> bookieIndexesToRereplicate = Sets.newHashSet();
+ for (int bookieIndex = 0; bookieIndex < ensemble.size(); bookieIndex++) {
+ BookieSocketAddress bookieInEnsemble = ensemble.get(bookieIndex);
+ if (bookiesToRereplicate.contains(bookieInEnsemble)) {
+ bookieIndexesToRereplicate.add(bookieIndex);
+ }
+ }
+ return getReplacementBookiesByIndexes(
+ lh, ensemble, bookieIndexesToRereplicate, Optional.of(bookiesToRereplicate));
+ }
+
+ private Map<Integer, BookieSocketAddress> getReplacementBookiesByIndexes(
+ LedgerHandle lh,
+ List<BookieSocketAddress> ensemble,
+ Set<Integer> bookieIndexesToRereplicate,
+ Optional<Set<BookieSocketAddress>> excludedBookies)
+ throws BKException.BKNotEnoughBookiesException {
+ // target bookies to replicate
+ Map<Integer, BookieSocketAddress> targetBookieAddresses =
+ Maps.newHashMapWithExpectedSize(bookieIndexesToRereplicate.size());
+ // bookies to exclude for ensemble allocation
+ Set<BookieSocketAddress> bookiesToExclude = Sets.newHashSet();
+ if (excludedBookies.isPresent()) {
+ bookiesToExclude.addAll(excludedBookies.get());
+ }
+
+ // excluding bookies that need to be replicated
+ for (Integer bookieIndex : bookieIndexesToRereplicate) {
+ BookieSocketAddress bookie = ensemble.get(bookieIndex);
+ bookiesToExclude.add(bookie);
+ }
+
+ // allocate bookies
+ for (Integer bookieIndex : bookieIndexesToRereplicate) {
+ BookieSocketAddress oldBookie = ensemble.get(bookieIndex);
+ BookieSocketAddress newBookie =
+ bkc.getPlacementPolicy().replaceBookie(
+ lh.getLedgerMetadata().getEnsembleSize(),
+ lh.getLedgerMetadata().getWriteQuorumSize(),
+ lh.getLedgerMetadata().getAckQuorumSize(),
+ lh.getLedgerMetadata().getCustomMetadata(),
+ ensemble,
+ oldBookie,
+ bookiesToExclude);
+ targetBookieAddresses.put(bookieIndex, newBookie);
+ bookiesToExclude.add(newBookie);
+ }
+
+ return targetBookieAddresses;
+ }
+
+ private ArrayList<BookieSocketAddress> replaceBookiesInEnsemble(
+ List<BookieSocketAddress> ensemble,
+ Map<Integer, BookieSocketAddress> replacedBookies) {
+ ArrayList<BookieSocketAddress> newEnsemble = Lists.newArrayList(ensemble);
+ for (Map.Entry<Integer, BookieSocketAddress> entry : replacedBookies.entrySet()) {
+ newEnsemble.set(entry.getKey(), entry.getValue());
+ }
+ return newEnsemble;
}
/**
@@ -882,20 +936,32 @@ public class BookKeeperAdmin implements AutoCloseable {
* - ledgerHandle
* @param ledgerFragment
* - LedgerFragment to replicate
- * @param targetBookieAddress
- * - target Bookie, to where entries should be replicated.
*/
public void replicateLedgerFragment(LedgerHandle lh,
+ final LedgerFragment ledgerFragment)
+ throws InterruptedException, BKException {
+ Optional<Set<BookieSocketAddress>> excludedBookies = Optional.empty();
+ Map<Integer, BookieSocketAddress> targetBookieAddresses =
+ getReplacementBookiesByIndexes(lh, ledgerFragment.getEnsemble(),
+ ledgerFragment.getBookiesIndexes(), excludedBookies);
+ replicateLedgerFragment(lh, ledgerFragment, targetBookieAddresses);
+ }
+
+ private void replicateLedgerFragment(LedgerHandle lh,
final LedgerFragment ledgerFragment,
- final BookieSocketAddress targetBookieAddress)
+ final Map<Integer, BookieSocketAddress> targetBookieAddresses)
throws InterruptedException, BKException {
CompletableFuture<Void> result = new CompletableFuture<>();
ResultCallBack resultCallBack = new ResultCallBack(result);
- SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack,
- lh, ledgerFragment.getFirstEntryId(), ledgerFragment
- .getAddress(), targetBookieAddress);
+ SingleFragmentCallback cb = new SingleFragmentCallback(
+ resultCallBack,
+ lh,
+ ledgerFragment.getFirstEntryId(),
+ getReplacementBookiesMap(ledgerFragment, targetBookieAddresses));
- asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress);
+ Set<BookieSocketAddress> targetBookieSet = Sets.newHashSet();
+ targetBookieSet.addAll(targetBookieAddresses.values());
+ asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieSet);
try {
SyncCallbackUtils.waitForResult(result);
@@ -903,6 +969,52 @@ public class BookKeeperAdmin implements AutoCloseable {
throw BKException.create(bkc.getReturnRc(err.getCode()));
}
}
+
+ private static Map<BookieSocketAddress, BookieSocketAddress> getReplacementBookiesMap(
+ ArrayList<BookieSocketAddress> ensemble,
+ Map<Integer, BookieSocketAddress> targetBookieAddresses) {
+ Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
+ new HashMap<BookieSocketAddress, BookieSocketAddress>();
+ for (Map.Entry<Integer, BookieSocketAddress> entry : targetBookieAddresses.entrySet()) {
+ BookieSocketAddress oldBookie = ensemble.get(entry.getKey());
+ BookieSocketAddress newBookie = entry.getValue();
+ bookiesMap.put(oldBookie, newBookie);
+ }
+ return bookiesMap;
+ }
+
+ private static Map<BookieSocketAddress, BookieSocketAddress> getReplacementBookiesMap(
+ LedgerFragment ledgerFragment,
+ Map<Integer, BookieSocketAddress> targetBookieAddresses) {
+ Map<BookieSocketAddress, BookieSocketAddress> bookiesMap =
+ new HashMap<BookieSocketAddress, BookieSocketAddress>();
+ for (Integer bookieIndex : ledgerFragment.getBookiesIndexes()) {
+ BookieSocketAddress oldBookie = ledgerFragment.getAddress(bookieIndex);
+ BookieSocketAddress newBookie = targetBookieAddresses.get(bookieIndex);
+ bookiesMap.put(oldBookie, newBookie);
+ }
+ return bookiesMap;
+ }
+
+ private static boolean containBookiesInLastEnsemble(LedgerMetadata lm,
+ Set<BookieSocketAddress> bookies) {
+ if (lm.getEnsembles().size() <= 0) {
+ return false;
+ }
+ Long lastKey = lm.getEnsembles().lastKey();
+ ArrayList<BookieSocketAddress> lastEnsemble = lm.getEnsembles().get(lastKey);
+ return containBookies(lastEnsemble, bookies);
+ }
+
+ private static boolean containBookies(ArrayList<BookieSocketAddress> ensemble,
+ Set<BookieSocketAddress> bookies) {
+ for (BookieSocketAddress bookie : ensemble) {
+ if (bookies.contains(bookie)) {
+ return true;
+ }
+ }
+ return false;
+ }
/** This is the class for getting the replication result */
static class ResultCallBack implements AsyncCallback.VoidCallback {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
index a90a366..2ad8076 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerChecker.java
@@ -20,15 +20,14 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -81,39 +80,125 @@ public class LedgerChecker {
}
}
+ /**
+ * This will collect the bad bookies inside a ledger fragment.
+ */
+ private static class LedgerFragmentCallback implements GenericCallback<LedgerFragment> {
+
+ private final LedgerFragment fragment;
+ private final int bookieIndex;
+ // bookie index -> return code
+ private final Map<Integer, Integer> badBookies;
+ private final AtomicInteger numBookies;
+ private final GenericCallback<LedgerFragment> cb;
+
+ LedgerFragmentCallback(LedgerFragment lf,
+ int bookieIndex,
+ GenericCallback<LedgerFragment> cb,
+ Map<Integer, Integer> badBookies,
+ AtomicInteger numBookies) {
+ this.fragment = lf;
+ this.bookieIndex = bookieIndex;
+ this.cb = cb;
+ this.badBookies = badBookies;
+ this.numBookies = numBookies;
+ }
+
+ @Override
+ public void operationComplete(int rc, LedgerFragment lf) {
+ if (BKException.Code.OK != rc) {
+ synchronized (badBookies) {
+ badBookies.put(bookieIndex, rc);
+ }
+ }
+ if (numBookies.decrementAndGet() == 0) {
+ if (badBookies.isEmpty()) {
+ cb.operationComplete(BKException.Code.OK, fragment);
+ } else {
+ int rcToReturn = BKException.Code.NoBookieAvailableException;
+ for (Map.Entry<Integer, Integer> entry : badBookies.entrySet()) {
+ rcToReturn = entry.getValue();
+ if (entry.getValue() == BKException.Code.ClientClosedException) {
+ break;
+ }
+ }
+ cb.operationComplete(rcToReturn,
+ fragment.subset(badBookies.keySet()));
+ }
+ }
+ }
+ }
+
public LedgerChecker(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}
+ /**
+ * Verify a ledger fragment to collect bad bookies
+ *
+ * @param fragment
+ * fragment to verify
+ * @param cb
+ * callback
+ * @throws InvalidFragmentException
+ */
+ private void verifyLedgerFragment(LedgerFragment fragment,
+ GenericCallback<LedgerFragment> cb)
+ throws InvalidFragmentException, BKException {
+ Set<Integer> bookiesToCheck = fragment.getBookiesIndexes();
+ if (bookiesToCheck.isEmpty()) {
+ cb.operationComplete(BKException.Code.OK, fragment);
+ return;
+ }
+
+ AtomicInteger numBookies = new AtomicInteger(bookiesToCheck.size());
+ Map<Integer, Integer> badBookies = new HashMap<Integer, Integer>();
+ for (Integer bookieIndex : bookiesToCheck) {
+ LedgerFragmentCallback lfCb = new LedgerFragmentCallback(
+ fragment, bookieIndex, cb, badBookies, numBookies);
+ verifyLedgerFragment(fragment, bookieIndex, lfCb);
+ }
+ }
+
+ /**
+ * Verify a bookie inside a ledger fragment.
+ *
+ * @param fragment
+ * ledger fragment
+ * @param bookieIndex
+ * bookie index in the fragment
+ * @param cb
+ * callback
+ * @throws InvalidFragmentException
+ */
private void verifyLedgerFragment(LedgerFragment fragment,
- GenericCallback<LedgerFragment> cb) throws InvalidFragmentException {
- long firstStored = fragment.getFirstStoredEntryId();
- long lastStored = fragment.getLastStoredEntryId();
-
- // because of this if block, even if the bookie of the fragment is
- // down, it considers Fragment is available/not-bad if firstStored
- // and lastStored are LedgerHandle.INVALID_ENTRY_ID.
- // So same logic is used in BookieShell.DecommissionBookieCmd.areEntriesOfSegmentStoredInTheBookie
- // if any change is made here, then the changes should be in BookieShell also
+ int bookieIndex,
+ GenericCallback<LedgerFragment> cb)
+ throws InvalidFragmentException {
+ long firstStored = fragment.getFirstStoredEntryId(bookieIndex);
+ long lastStored = fragment.getLastStoredEntryId(bookieIndex);
+
+ BookieSocketAddress bookie = fragment.getAddress(bookieIndex);
+ if (null == bookie) {
+ throw new InvalidFragmentException();
+ }
+
if (firstStored == LedgerHandle.INVALID_ENTRY_ID) {
+ // this fragment is not on this bookie
if (lastStored != LedgerHandle.INVALID_ENTRY_ID) {
throw new InvalidFragmentException();
}
cb.operationComplete(BKException.Code.OK, fragment);
- return;
- }
- if (firstStored == lastStored) {
+ } else if (firstStored == lastStored) {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(1,
fragment, cb);
- bookieClient.readEntry(fragment.getAddress(), fragment
+ bookieClient.readEntry(bookie, fragment
.getLedgerId(), firstStored, manycb, null);
} else {
ReadManyEntriesCallback manycb = new ReadManyEntriesCallback(2,
fragment, cb);
- bookieClient.readEntry(fragment.getAddress(), fragment
- .getLedgerId(), firstStored, manycb, null);
- bookieClient.readEntry(fragment.getAddress(), fragment
- .getLedgerId(), lastStored, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(), firstStored, manycb, null);
+ bookieClient.readEntry(bookie, fragment.getLedgerId(), lastStored, manycb, null);
}
}
@@ -191,44 +276,49 @@ public class LedgerChecker {
for (Map.Entry<Long, ArrayList<BookieSocketAddress>> e : lh
.getLedgerMetadata().getEnsembles().entrySet()) {
if (curEntryId != null) {
+ Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
- fragments.add(new LedgerFragment(lh, curEntryId,
- e.getKey() - 1, i));
+ bookieIndexes.add(i);
}
+ fragments.add(new LedgerFragment(lh, curEntryId,
+ e.getKey() - 1, bookieIndexes));
}
curEntryId = e.getKey();
curEnsemble = e.getValue();
}
- /* Checking the last fragment of the ledger can be complicated in some cases.
+
+
+
+ /* Checking the last segment of the ledger can be complicated in some cases.
* In the case that the ledger is closed, we can just check the fragments of
- * the ledger as normal, except in the case that no entry was ever written,
- * to the ledger, in which case we check no fragments.
+ * the segment as normal even if no data has ever been written to.
* In the case that the ledger is open, but enough entries have been written,
- * for lastAddConfirmed to be set above the start entry of the fragment, we
+ * for lastAddConfirmed to be set above the start entry of the segment, we
* can also check as normal.
- * However, if lastAddConfirmed cannot be trusted, such as when it's lower than
- * the first entry id, or not set at all, we cannot be sure if there has been
- * data written to the fragment. For this reason, we have to send a read request
+ * However, if ledger is open, sometimes lastAddConfirmed cannot be trusted,
+ * such as when it's lower than the first entry id, or not set at all,
+ * we cannot be sure if there has been data written to the segment.
+ * For this reason, we have to send a read request
* to the bookies which should have the first entry. If they respond with
* NoSuchEntry we can assume it was never written. If they respond with anything
* else, we must assume the entry has been written, so we run the check.
*/
- if (curEntryId != null && !(lh.getLedgerMetadata().isClosed() && lh.getLastAddConfirmed() < curEntryId)) {
+ if (curEntryId != null) {
long lastEntry = lh.getLastAddConfirmed();
- if (lastEntry < curEntryId) {
+ if (!lh.isClosed() && lastEntry < curEntryId) {
lastEntry = curEntryId;
}
- final Set<LedgerFragment> finalFragments = new HashSet<LedgerFragment>();
+ Set<Integer> bookieIndexes = new HashSet<Integer>();
for (int i = 0; i < curEnsemble.size(); i++) {
- finalFragments.add(new LedgerFragment(lh, curEntryId,
- lastEntry, i));
+ bookieIndexes.add(i);
}
+ final LedgerFragment lastLedgerFragment = new LedgerFragment(lh, curEntryId,
+ lastEntry, bookieIndexes);
- // Check for the case that no last confirmed entry has
- // been set.
+ // Check for the case that no last confirmed entry has been set
if (curEntryId == lastEntry) {
final long entryToRead = curEntryId;
@@ -237,7 +327,7 @@ public class LedgerChecker {
new GenericCallback<Boolean>() {
public void operationComplete(int rc, Boolean result) {
if (result) {
- fragments.addAll(finalFragments);
+ fragments.add(lastLedgerFragment);
}
checkFragments(fragments, cb);
}
@@ -250,7 +340,7 @@ public class LedgerChecker {
}
return;
} else {
- fragments.addAll(finalFragments);
+ fragments.add(lastLedgerFragment);
}
}
@@ -275,7 +365,10 @@ public class LedgerChecker {
LOG.error("Invalid fragment found : {}", r);
allFragmentsCb.operationComplete(
BKException.Code.IncorrectParameterException, r);
+ } catch (BKException e) {
+ LOG.error("BKException when checking fragment : {}", r, e);
}
}
}
+
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
index e12f77f..3f02355 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragment.java
@@ -20,19 +20,20 @@
package org.apache.bookkeeper.client;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.SortedMap;
-
import org.apache.bookkeeper.net.BookieSocketAddress;
/**
- * Represents the entries of a segment of a ledger which are stored on a single
- * bookie in the segments bookie ensemble.
- *
+ * Represents the entries of a segment of a ledger which are stored on subset of
+ * bookies in the segments bookie ensemble.
+ *
* Used for checking and recovery
*/
public class LedgerFragment {
- private final int bookieIndex;
+ private final Set<Integer> bookieIndexes;
private final List<BookieSocketAddress> ensemble;
private final long firstEntryId;
private final long lastKnownEntryId;
@@ -40,12 +41,14 @@ public class LedgerFragment {
private final DistributionSchedule schedule;
private final boolean isLedgerClosed;
- LedgerFragment(LedgerHandle lh, long firstEntryId,
- long lastKnownEntryId, int bookieIndex) {
+ LedgerFragment(LedgerHandle lh,
+ long firstEntryId,
+ long lastKnownEntryId,
+ Set<Integer> bookieIndexes) {
this.ledgerId = lh.getId();
this.firstEntryId = firstEntryId;
this.lastKnownEntryId = lastKnownEntryId;
- this.bookieIndex = bookieIndex;
+ this.bookieIndexes = bookieIndexes;
this.ensemble = lh.getLedgerMetadata().getEnsemble(firstEntryId);
this.schedule = lh.getDistributionSchedule();
SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = lh
@@ -54,6 +57,27 @@ public class LedgerFragment {
|| !ensemble.equals(ensembles.get(ensembles.lastKey()));
}
+ LedgerFragment(LedgerFragment lf, Set<Integer> subset) {
+ this.ledgerId = lf.ledgerId;
+ this.firstEntryId = lf.firstEntryId;
+ this.lastKnownEntryId = lf.lastKnownEntryId;
+ this.bookieIndexes = subset;
+ this.ensemble = lf.ensemble;
+ this.schedule = lf.schedule;
+ this.isLedgerClosed = lf.isLedgerClosed;
+ }
+
+ /**
+ * Return a ledger fragment contains subset of bookies.
+ *
+ * @param subset
+ * subset of bookies.
+ * @return ledger fragment contains subset of bookies
+ */
+ public LedgerFragment subset(Set<Integer> subset) {
+ return new LedgerFragment(this, subset);
+ }
+
/**
* Returns true, if and only if the ledger fragment will never be modified
* by any of the clients in future, otherwise false. i.e,
@@ -83,23 +107,51 @@ public class LedgerFragment {
/**
* Gets the failedBookie address
*/
- public BookieSocketAddress getAddress() {
+ public BookieSocketAddress getAddress(int bookieIndex) {
return ensemble.get(bookieIndex);
}
-
+
+ public Set<BookieSocketAddress> getAddresses() {
+ Set<BookieSocketAddress> addresses = new HashSet<BookieSocketAddress>();
+ for (int bookieIndex : bookieIndexes) {
+ addresses.add(ensemble.get(bookieIndex));
+ }
+ return addresses;
+ }
+
/**
* Gets the failedBookie index
*/
- public int getBookiesIndex() {
- return bookieIndex;
+ public Set<Integer> getBookiesIndexes() {
+ return bookieIndexes;
}
/**
- * Gets the first stored entry id of the fragment in failed bookie.
- *
+ * Gets the first stored entry id of the fragment in failed bookies.
+ *
* @return entryId
*/
public long getFirstStoredEntryId() {
+ Long firstEntry = null;
+ for (int bookieIndex : bookieIndexes) {
+ Long firstStoredEntryForBookie = getFirstStoredEntryId(bookieIndex);
+ if (null == firstEntry) {
+ firstEntry = firstStoredEntryForBookie;
+ } else if (null != firstStoredEntryForBookie) {
+ firstEntry = Math.min(firstEntry, firstStoredEntryForBookie);
+ }
+ }
+ return null == firstEntry ? LedgerHandle.INVALID_ENTRY_ID : firstEntry;
+ }
+
+ /**
+ * Get the first stored entry id of the fragment in the given failed bookies.
+ *
+ * @param bookieIndex
+ * the bookie index in the ensemble.
+ * @return first stored entry id on the bookie.
+ */
+ public Long getFirstStoredEntryId(int bookieIndex) {
long firstEntry = firstEntryId;
for (int i = 0; i < ensemble.size() && firstEntry <= lastKnownEntryId; i++) {
@@ -114,10 +166,30 @@ public class LedgerFragment {
/**
* Gets the last stored entry id of the fragment in failed bookie.
- *
+ *
* @return entryId
*/
public long getLastStoredEntryId() {
+ Long lastEntry = null;
+ for (int bookieIndex : bookieIndexes) {
+ Long lastStoredEntryIdForBookie = getLastStoredEntryId(bookieIndex);
+ if (null == lastEntry) {
+ lastEntry = lastStoredEntryIdForBookie;
+ } else if (null != lastStoredEntryIdForBookie) {
+ lastEntry = Math.max(lastEntry, lastStoredEntryIdForBookie);
+ }
+ }
+ return null == lastEntry ? LedgerHandle.INVALID_ENTRY_ID : lastEntry;
+ }
+
+ /**
+ * Get the last stored entry id of the fragment in the given failed bookie.
+ *
+ * @param bookieIndex
+ * the bookie index in the ensemble.
+ * @return first stored entry id on the bookie.
+ */
+ public Long getLastStoredEntryId(int bookieIndex) {
long lastEntry = lastKnownEntryId;
for (int i = 0; i < ensemble.size() && lastEntry >= firstEntryId; i++) {
if (schedule.hasEntry(lastEntry, bookieIndex)) {
@@ -131,7 +203,7 @@ public class LedgerFragment {
/**
* Gets the ensemble of fragment
- *
+ *
* @return the ensemble for the segment which this fragment is a part of
*/
public List<BookieSocketAddress> getEnsemble() {
@@ -143,6 +215,6 @@ public class LedgerFragment {
return String.format("Fragment(LedgerID: %d, FirstEntryID: %d[%d], "
+ "LastKnownEntryID: %d[%d], Host: %s, Closed: %s)", ledgerId, firstEntryId,
getFirstStoredEntryId(), lastKnownEntryId, getLastStoredEntryId(),
- getAddress(), isLedgerClosed);
+ getAddresses(), isLedgerClosed);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 172f9ec..95c48d5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -19,17 +19,20 @@
*/
package org.apache.bookkeeper.client;
+import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -79,7 +82,7 @@ public class LedgerFragmentReplicator {
private void replicateFragmentInternal(final LedgerHandle lh,
final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress newBookie) throws InterruptedException {
+ final Set<BookieSocketAddress> newBookies) throws InterruptedException {
if (!lf.isClosed()) {
LOG.error("Trying to replicate an unclosed fragment;"
+ " This is not safe {}", lf);
@@ -94,13 +97,13 @@ public class LedgerFragmentReplicator {
* Ideally this should never happen if bookie failure is taken care
* of properly. Nothing we can do though in this case.
*/
- LOG.warn("Dead bookie (" + lf.getAddress()
+ LOG.warn("Dead bookie (" + lf.getAddresses()
+ ") is still part of the current"
+ " active ensemble for ledgerId: " + lh.getId());
ledgerFragmentMcb.processResult(BKException.Code.OK, null, null);
return;
}
- if (startEntryId > endEntryId) {
+ if (startEntryId > endEntryId || endEntryId <= INVALID_ENTRY_ID) {
// for open ledger which there is no entry, the start entry id is 0,
// the end entry id is -1.
// we can return immediately to trigger forward read
@@ -126,7 +129,7 @@ public class LedgerFragmentReplicator {
BKException.Code.LedgerRecoveryException);
for (final Long entryId : entriesToReplicate) {
recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb,
- newBookie);
+ newBookies);
}
}
@@ -146,27 +149,27 @@ public class LedgerFragmentReplicator {
* @param ledgerFragmentMcb
* MultiCallback to invoke once we've recovered the current
* ledger fragment.
- * @param targetBookieAddress
- * New bookie we want to use to recover and replicate the ledger
+ * @param targetBookieAddresses
+ * New bookies we want to use to recover and replicate the ledger
* entries that were stored on the failed bookie.
*/
void replicate(final LedgerHandle lh, final LedgerFragment lf,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress targetBookieAddress)
+ final Set<BookieSocketAddress> targetBookieAddresses)
throws InterruptedException {
Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf,
bkc.getConf().getRereplicationEntryBatchSize());
- LOG.info("Fragment :" + lf + " is split into sub fragments :"
- + partionedFragments);
+ LOG.info("Replicating fragment {} in {} sub fragments.",
+ lf, partionedFragments.size());
replicateNextBatch(lh, partionedFragments.iterator(),
- ledgerFragmentMcb, targetBookieAddress);
+ ledgerFragmentMcb, targetBookieAddresses);
}
/** Replicate the batched entry fragments one after other */
private void replicateNextBatch(final LedgerHandle lh,
final Iterator<LedgerFragment> fragments,
final AsyncCallback.VoidCallback ledgerFragmentMcb,
- final BookieSocketAddress targetBookieAddress) {
+ final Set<BookieSocketAddress> targetBookieAddresses) {
if (fragments.hasNext()) {
try {
replicateFragmentInternal(lh, fragments.next(),
@@ -179,11 +182,11 @@ public class LedgerFragmentReplicator {
} else {
replicateNextBatch(lh, fragments,
ledgerFragmentMcb,
- targetBookieAddress);
+ targetBookieAddresses);
}
}
- }, targetBookieAddress);
+ }, targetBookieAddresses);
} catch (InterruptedException e) {
ledgerFragmentMcb.processResult(
BKException.Code.InterruptedException, null, null);
@@ -224,7 +227,7 @@ public class LedgerFragmentReplicator {
for (int i = 0; i < splitsWithFullEntries; i++) {
fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1;
fragments.add(new LedgerFragment(lh, firstEntryId,
- fragmentSplitLastEntry, ledgerFragment.getBookiesIndex()));
+ fragmentSplitLastEntry, ledgerFragment.getBookiesIndexes()));
firstEntryId = fragmentSplitLastEntry + 1;
}
@@ -233,7 +236,7 @@ public class LedgerFragmentReplicator {
if (lastSplitWithPartialEntries > 0) {
fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId
+ lastSplitWithPartialEntries - 1, ledgerFragment
- .getBookiesIndex()));
+ .getBookiesIndexes()));
}
return fragments;
}
@@ -242,7 +245,7 @@ public class LedgerFragmentReplicator {
* This method asynchronously recovers a specific ledger entry by reading
* the values via the BookKeeper Client (which would read it from the other
* replicas) and then writing it to the chosen new bookie.
- *
+ *
* @param entryId
* Ledger Entry ID to recover.
* @param lh
@@ -250,14 +253,41 @@ public class LedgerFragmentReplicator {
* @param ledgerFragmentEntryMcb
* MultiCallback to invoke once we've recovered the current
* ledger entry.
- * @param newBookie
- * New bookie we want to use to recover and replicate the ledger
+ * @param newBookies
+ * New bookies we want to use to recover and replicate the ledger
* entries that were stored on the failed bookie.
*/
private void recoverLedgerFragmentEntry(final Long entryId,
final LedgerHandle lh,
final AsyncCallback.VoidCallback ledgerFragmentEntryMcb,
- final BookieSocketAddress newBookie) throws InterruptedException {
+ final Set<BookieSocketAddress> newBookies) throws InterruptedException {
+ final AtomicInteger numCompleted = new AtomicInteger(0);
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ final WriteCallback multiWriteCallback = new WriteCallback() {
+ @Override
+ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) {
+ if (rc != BKException.Code.OK) {
+ LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}",
+ new Object[] { ledgerId, entryId, addr, BKException.create(rc) });
+ if (completed.compareAndSet(false, true)) {
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ }
+ } else {
+ numEntriesWritten.inc();
+ if (ctx instanceof Long) {
+ numBytesWritten.registerSuccessfulValue((Long) ctx);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!",
+ new Object[] { ledgerId, entryId, addr });
+ }
+ if (numCompleted.incrementAndGet() == newBookies.size() &&
+ completed.compareAndSet(false, true)) {
+ ledgerFragmentEntryMcb.processResult(rc, null, null);
+ }
+ }
+ }
+ };
/*
* Read the ledger entry using the LedgerHandle. This will allow us to
* read the entry from one of the other replicated bookies other than
@@ -286,42 +316,16 @@ public class LedgerFragmentReplicator {
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
- bkc.getBookieClient().addEntry(newBookie, lh.getId(),
- lh.getLedgerKey(), entryId, toSend,
- new WriteCallback() {
- @Override
- public void writeComplete(int rc, long ledgerId,
- long entryId, BookieSocketAddress addr,
- Object ctx) {
- if (rc != BKException.Code.OK) {
- LOG.error(
- "BK error writing entry for ledgerId: "
- + ledgerId + ", entryId: "
- + entryId + ", bookie: "
- + addr, BKException
- .create(rc));
- } else {
- numEntriesWritten.inc();
- numBytesWritten.registerSuccessfulValue(dataLength);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Success writing ledger id "
- + ledgerId + ", entry id "
- + entryId + " to a new bookie "
- + addr + "!");
- }
- }
- /*
- * Pass the return code result up the chain with
- * the parent callback.
- */
- ledgerFragmentEntryMcb.processResult(rc, null,
- null);
- }
- }, null, BookieProtocol.FLAG_RECOVERY_ADD);
+ for (BookieSocketAddress newBookie : newBookies) {
+ bkc.getBookieClient().addEntry(newBookie, lh.getId(),
+ lh.getLedgerKey(), entryId, toSend.retainedSlice(),
+ multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD);
+ }
+ toSend.release();
}
}, null);
}
-
+
/**
* Callback for recovery of a single ledger fragment. Once the fragment has
* had all entries replicated, update the ensemble in zookeeper. Once
@@ -332,17 +336,15 @@ public class LedgerFragmentReplicator {
final AsyncCallback.VoidCallback ledgerFragmentsMcb;
final LedgerHandle lh;
final long fragmentStartId;
- final BookieSocketAddress oldBookie;
- final BookieSocketAddress newBookie;
+ final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb,
LedgerHandle lh, long fragmentStartId,
- BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
+ Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
this.ledgerFragmentsMcb = ledgerFragmentsMcb;
this.lh = lh;
this.fragmentStartId = fragmentStartId;
- this.newBookie = newBookie;
- this.oldBookie = oldBookie;
+ this.oldBookie2NewBookie = oldBookie2NewBookie;
}
@Override
@@ -353,39 +355,32 @@ public class LedgerFragmentReplicator {
ledgerFragmentsMcb.processResult(rc, null, null);
return;
}
- updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh,
- oldBookie, newBookie);
+ updateEnsembleInfo(ledgerFragmentsMcb, fragmentStartId, lh, oldBookie2NewBookie);
}
}
/** Updates the ensemble with newBookie and notify the ensembleUpdatedCb */
private static void updateEnsembleInfo(
AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId,
- LedgerHandle lh, BookieSocketAddress oldBookie,
- BookieSocketAddress newBookie) {
+ LedgerHandle lh, Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
/*
* Update the ledger metadata's ensemble info to point to the new
* bookie.
*/
ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata()
.getEnsembles().get(fragmentStartId);
- int deadBookieIndex = ensemble.indexOf(oldBookie);
-
- /*
- * An update to the ensemble info might happen after re-reading ledger metadata.
- * Such an update might reflect a change to the ensemble membership such that
- * it might not be necessary to replace the bookie.
- */
- if (deadBookieIndex >= 0) {
- ensemble.remove(deadBookieIndex);
- ensemble.add(deadBookieIndex, newBookie);
- lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
- fragmentStartId, lh, oldBookie, newBookie));
- }
- else {
- LOG.warn("Bookie {} doesn't exist in ensemble {} anymore.", oldBookie, ensemble);
- ensembleUpdatedCb.processResult(BKException.Code.UnexpectedConditionException, null, null);
+ for (Map.Entry<BookieSocketAddress, BookieSocketAddress> entry : oldBookie2NewBookie.entrySet()) {
+ int deadBookieIndex = ensemble.indexOf(entry.getKey());
+ // update ensemble info might happen after re-read ledger metadata, so the ensemble might already
+ // change. if ensemble is already changed, skip replacing the bookie doesn't exist.
+ if (deadBookieIndex >= 0) {
+ ensemble.set(deadBookieIndex, entry.getValue());
+ } else {
+ LOG.info("Bookie {} doesn't exist in ensemble {} anymore.", entry.getKey(), ensemble);
+ }
}
+ lh.writeLedgerConfig(new UpdateEnsembleCb(ensembleUpdatedCb,
+ fragmentStartId, lh, oldBookie2NewBookie));
}
/**
@@ -397,17 +392,15 @@ public class LedgerFragmentReplicator {
final AsyncCallback.VoidCallback ensembleUpdatedCb;
final LedgerHandle lh;
final long fragmentStartId;
- final BookieSocketAddress oldBookie;
- final BookieSocketAddress newBookie;
+ final Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie;
public UpdateEnsembleCb(AsyncCallback.VoidCallback ledgerFragmentsMcb,
long fragmentStartId, LedgerHandle lh,
- BookieSocketAddress oldBookie, BookieSocketAddress newBookie) {
+ Map<BookieSocketAddress, BookieSocketAddress> oldBookie2NewBookie) {
this.ensembleUpdatedCb = ledgerFragmentsMcb;
this.lh = lh;
this.fragmentStartId = fragmentStartId;
- this.newBookie = newBookie;
- this.oldBookie = oldBookie;
+ this.oldBookie2NewBookie = oldBookie2NewBookie;
}
@Override
@@ -418,9 +411,8 @@ public class LedgerFragmentReplicator {
// try again, the previous success (with which this has
// conflicted) will have updated the stat other operations
// such as (addEnsemble) would update it too.
- lh
- .rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
- lh.bk.getMainWorkerPool(), lh.getId()) {
+ lh.rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(
+ lh.bk.mainWorkerPool, lh.getId()) {
@Override
public void safeOperationComplete(int rc,
LedgerMetadata newMeta) {
@@ -433,8 +425,7 @@ public class LedgerFragmentReplicator {
} else {
lh.metadata = newMeta;
updateEnsembleInfo(ensembleUpdatedCb,
- fragmentStartId, lh, oldBookie,
- newBookie);
+ fragmentStartId, lh, oldBookie2NewBookie);
}
}
@Override
@@ -449,8 +440,8 @@ public class LedgerFragmentReplicator {
} else {
LOG.info("Updated ZK for ledgerId: (" + lh.getId() + " : "
+ fragmentStartId
- + ") to point ledger fragments from old dead bookie: ("
- + oldBookie + ") to new bookie: (" + newBookie + ")");
+ + ") to point ledger fragments from old bookies to new bookies: "
+ + oldBookie2NewBookie);
}
/*
* Pass the return code result up the chain with the parent
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 10fb329..787540e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -115,10 +115,13 @@ public class ServerConfiguration extends AbstractConfiguration {
protected final static String DISK_USAGE_WARN_THRESHOLD = "diskUsageWarnThreshold";
protected final static String DISK_USAGE_LWM_THRESHOLD = "diskUsageLwmThreshold";
protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval";
+
+ // Replication parameters
protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
protected final static String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
protected final static String AUTO_RECOVERY_DAEMON_ENABLED = "autoRecoveryDaemonEnabled";
protected final static String LOST_BOOKIE_RECOVERY_DELAY = "lostBookieRecoveryDelay";
+ protected final static String RW_REREPLICATE_BACKOFF_MS = "rwRereplicateBackoffMs";
// Worker Thread parameters.
protected final static String NUM_ADD_WORKER_THREADS = "numAddWorkerThreads";
@@ -1760,6 +1763,24 @@ public class ServerConfiguration extends AbstractConfiguration {
}
/**
+ * Get how long to backoff when encountering exception on rereplicating a ledger.
+ *
+ * @return backoff time in milliseconds
+ */
+ public int getRwRereplicateBackoffMs() {
+ return getInt(RW_REREPLICATE_BACKOFF_MS, 5000);
+ }
+
+ /**
+ * Set how long to backoff when encountering exception on rereplicating a ledger.
+ *
+ * @param backoffMs backoff time in milliseconds
+ */
+ public void setRwRereplicateBackoffMs(int backoffMs) {
+ setProperty(RW_REREPLICATE_BACKOFF_MS, backoffMs);
+ }
+
+ /**
* Sets that whether force start a bookie in readonly mode
*
* @param enabled
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index ab35b41..4f84d03 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -20,6 +20,10 @@
*/
package org.apache.bookkeeper.replication;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -33,7 +37,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -63,11 +66,6 @@ import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
-
/**
* Auditor is a single entity in the entire Bookie cluster and will be watching
* all the bookies under 'ledgerrootpath/available' zkpath. When any of the
@@ -570,7 +568,7 @@ public class Auditor implements BookiesListener {
if (rc == BKException.Code.OK) {
Set<BookieSocketAddress> bookies = Sets.newHashSet();
for (LedgerFragment f : fragments) {
- bookies.add(f.getAddress());
+ bookies.addAll(f.getAddresses());
}
for (BookieSocketAddress bookie : bookies) {
publishSuspectedLedgers(bookie.toString(), Sets.newHashSet(lh.getId()));
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index 9c3c019..5caac6f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -20,14 +20,12 @@
*/
package org.apache.bookkeeper.replication;
+import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Set;
-
-import com.google.common.annotations.VisibleForTesting;
-
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.ExitCode;
@@ -105,8 +103,7 @@ public class AutoRecoveryMain {
.build();
auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
zk, statsLogger.scope(AUDITOR_SCOPE));
- replicationWorker = new ReplicationWorker(zk, conf,
- Bookie.getBookieAddress(conf), statsLogger.scope(REPLICATION_WORKER_SCOPE));
+ replicationWorker = new ReplicationWorker(zk, conf, statsLogger.scope(REPLICATION_WORKER_SCOPE));
deathWatcher = new AutoRecoveryDeathWatcher(this);
}
@@ -115,7 +112,7 @@ public class AutoRecoveryMain {
this.conf = conf;
this.zk = zk;
auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf, zk);
- replicationWorker = new ReplicationWorker(zk, conf, Bookie.getBookieAddress(conf));
+ replicationWorker = new ReplicationWorker(zk, conf);
deathWatcher = new AutoRecoveryDeathWatcher(this);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
index fb7de20..0b48e33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationStats.java
@@ -43,6 +43,7 @@ public interface ReplicationStats {
public final static String NUM_BYTES_READ = "NUM_BYTES_READ";
public final static String NUM_ENTRIES_WRITTEN = "NUM_ENTRIES_WRITTEN";
public final static String NUM_BYTES_WRITTEN = "NUM_BYTES_WRITTEN";
+ public final static String REPLICATE_EXCEPTION = "exceptions";
public final static String BK_CLIENT_SCOPE = "bk_client";
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index 98960b0..61205f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -19,8 +19,10 @@
*/
package org.apache.bookkeeper.replication;
+import com.google.common.base.Stopwatch;
import java.io.IOException;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -29,13 +31,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
-
import org.apache.bookkeeper.bookie.BookieThread;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
+import org.apache.bookkeeper.client.BKException.BKLedgerRecoveryException;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BKException.BKReadException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -61,6 +62,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
+import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
+import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
/**
@@ -68,24 +71,25 @@ import static org.apache.bookkeeper.replication.ReplicationStats.REREPLICATE_OP;
* ZKLedgerUnderreplicationManager and replicates to it.
*/
public class ReplicationWorker implements Runnable {
- private final static Logger LOG = LoggerFactory
+ private static final Logger LOG = LoggerFactory
.getLogger(ReplicationWorker.class);
- final private LedgerUnderreplicationManager underreplicationManager;
+ private final LedgerUnderreplicationManager underreplicationManager;
private final ServerConfiguration conf;
private final ZooKeeper zkc;
private volatile boolean workerRunning = false;
- private volatile boolean isInReadOnlyMode = false;
- final private BookKeeperAdmin admin;
+ private final BookKeeperAdmin admin;
private final LedgerChecker ledgerChecker;
- private final BookieSocketAddress targetBookie;
private final BookKeeper bkc;
private final Thread workerThread;
+ private final long rwRereplicateBackoffMs;
private final long openLedgerRereplicationGracePeriod;
private final Timer pendingReplicationTimer;
// Expose Stats
+ private final StatsLogger statsLogger;
private final OpStatsLogger rereplicateOpStats;
private final Counter numLedgersReplicated;
+ private final Map<String,Counter> exceptionCounters;
/**
* Replication worker for replicating the ledger fragments from
@@ -96,15 +100,12 @@ public class ReplicationWorker implements Runnable {
* - ZK instance
* @param conf
* - configurations
- * @param targetBKAddr
- * - to where replication should happen. Ideally this will be
- * local Bookie address.
*/
public ReplicationWorker(final ZooKeeper zkc,
- final ServerConfiguration conf, BookieSocketAddress targetBKAddr)
+ final ServerConfiguration conf)
throws CompatibilityException, KeeperException,
InterruptedException, IOException {
- this(zkc, conf, targetBKAddr, NullStatsLogger.INSTANCE);
+ this(zkc, conf, NullStatsLogger.INSTANCE);
}
/**
@@ -116,18 +117,16 @@ public class ReplicationWorker implements Runnable {
* - ZK instance
* @param conf
* - configurations
- * @param targetBKAddr
- * - to where replication should happen. Ideally this will be
- * local Bookie address.
+ * @param statsLogger
+ * - stats logger
*/
public ReplicationWorker(final ZooKeeper zkc,
- final ServerConfiguration conf, BookieSocketAddress targetBKAddr,
+ final ServerConfiguration conf,
StatsLogger statsLogger)
throws CompatibilityException, KeeperException,
InterruptedException, IOException {
this.zkc = zkc;
this.conf = conf;
- this.targetBookie = targetBKAddr;
LedgerManagerFactory mFactory = LedgerManagerFactory
.newLedgerManagerFactory(this.conf, this.zkc);
this.underreplicationManager = mFactory
@@ -141,11 +140,14 @@ public class ReplicationWorker implements Runnable {
this.workerThread = new BookieThread(this, "ReplicationWorker");
this.openLedgerRereplicationGracePeriod = conf
.getOpenLedgerRereplicationGracePeriod();
+ this.rwRereplicateBackoffMs = conf.getRwRereplicateBackoffMs();
this.pendingReplicationTimer = new Timer("PendingReplicationTimer");
// Expose Stats
- this.rereplicateOpStats = statsLogger.getOpStatsLogger(REREPLICATE_OP);
- this.numLedgersReplicated = statsLogger.getCounter(ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+ this.statsLogger = statsLogger;
+ this.rereplicateOpStats = this.statsLogger.getOpStatsLogger(REREPLICATE_OP);
+ this.numLedgersReplicated = this.statsLogger.getCounter(NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED);
+ this.exceptionCounters = new HashMap<String, Counter>();
}
/** Start the replication worker */
@@ -167,38 +169,23 @@ public class ReplicationWorker implements Runnable {
return;
} catch (BKException e) {
LOG.error("BKException while replicating fragments", e);
- if (e instanceof BKException.BKWriteOnReadOnlyBookieException) {
- waitTillTargetBookieIsWritable();
- } else {
- waitBackOffTime();
- }
+ waitBackOffTime(rwRereplicateBackoffMs);
} catch (UnavailableException e) {
LOG.error("UnavailableException "
+ "while replicating fragments", e);
- waitBackOffTime();
+ waitBackOffTime(rwRereplicateBackoffMs);
}
}
LOG.info("ReplicationWorker exited loop!");
}
- private static void waitBackOffTime() {
+ private static void waitBackOffTime(long backoffMs) {
try {
- Thread.sleep(5000);
+ Thread.sleep(backoffMs);
} catch (InterruptedException e) {
}
}
- private void waitTillTargetBookieIsWritable() {
- LOG.info("Waiting for target bookie {} to be back in read/write mode", targetBookie);
- while (workerRunning && admin.getReadOnlyBookiesAsync().contains(targetBookie)) {
- isInReadOnlyMode = true;
- waitBackOffTime();
- }
-
- isInReadOnlyMode = false;
- LOG.info("Target bookie {} is back in read/write mode", targetBookie);
- }
-
/**
* Replicates the under replicated fragments from failed bookie ledger to
* targetBookie
@@ -229,39 +216,22 @@ public class ReplicationWorker implements Runnable {
LOG.debug("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
}
try (LedgerHandle lh = admin.openLedgerNoRecovery(ledgerIdToReplicate)) {
- Set<LedgerFragment> fragments = getUnderreplicatedFragments(lh);
+ Set<LedgerFragment> fragmentsBeforeReplicate = getUnderreplicatedFragments(lh);
if (LOG.isDebugEnabled()) {
- LOG.debug("Founds fragments {} for replication from ledger: {}", fragments, ledgerIdToReplicate);
+ LOG.debug("Founds fragments {} for replication from ledger: {}",
+ fragmentsBeforeReplicate, ledgerIdToReplicate);
}
boolean foundOpenFragments = false;
long numFragsReplicated = 0;
- for (LedgerFragment ledgerFragment : fragments) {
+ for (LedgerFragment ledgerFragment : fragmentsBeforeReplicate) {
if (!ledgerFragment.isClosed()) {
foundOpenFragments = true;
continue;
- } else if (isTargetBookieExistsInFragmentEnsemble(lh,
- ledgerFragment)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Target Bookie[{}] found in the fragment ensemble: {}", targetBookie,
- ledgerFragment.getEnsemble());
- }
- continue;
- }
- try {
- admin.replicateLedgerFragment(lh, ledgerFragment, targetBookie);
- numFragsReplicated++;
- } catch (BKException.BKBookieHandleNotAvailableException e) {
- LOG.warn("BKBookieHandleNotAvailableException "
- + "while replicating the fragment", e);
- } catch (BKException.BKLedgerRecoveryException e) {
- LOG.warn("BKLedgerRecoveryException "
- + "while replicating the fragment", e);
- if (admin.getReadOnlyBookiesAsync().contains(targetBookie)) {
- underreplicationManager.releaseUnderreplicatedLedger(ledgerIdToReplicate);
- throw new BKException.BKWriteOnReadOnlyBookieException();
- }
}
+ LOG.info("Going to replicate the fragments of the ledger: {}", ledgerIdToReplicate);
+ admin.replicateLedgerFragment(lh, ledgerFragment);
+ numFragsReplicated++;
}
if (numFragsReplicated > 0) {
@@ -273,13 +243,13 @@ public class ReplicationWorker implements Runnable {
return false;
}
- fragments = getUnderreplicatedFragments(lh);
- if (fragments.size() == 0) {
- LOG.info("Ledger replicated successfully. ledger id is: "
- + ledgerIdToReplicate);
+ Set<LedgerFragment> fragmentsAfterReplicate = getUnderreplicatedFragments(lh);
+ if (fragmentsAfterReplicate.size() == 0) {
+ LOG.info("Ledger {} is replicated successfully.", ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
return true;
} else {
+ LOG.info("Fail to replicate ledger {}.", ledgerIdToReplicate);
// Releasing the underReplication ledger lock and compete
// for the replication again for the pending fragments
underreplicationManager
@@ -289,30 +259,34 @@ public class ReplicationWorker implements Runnable {
} catch (BKNoSuchLedgerExistsException e) {
// Ledger might have been deleted by user
LOG.info("BKNoSuchLedgerExistsException while opening "
- + "ledger for replication. Other clients "
+ + "ledger {} for replication. Other clients "
+ "might have deleted the ledger. "
- + "So, no harm to continue");
+ + "So, no harm to continue", ledgerIdToReplicate);
underreplicationManager.markLedgerReplicated(ledgerIdToReplicate);
+ getExceptionCounter("BKNoSuchLedgerExistsException").inc();
return false;
- } catch (BKReadException e) {
- LOG.info("BKReadException while"
- + " opening ledger for replication."
- + " Enough Bookies might not have available"
- + "So, no harm to continue");
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
- return false;
- } catch (BKBookieHandleNotAvailableException e) {
- LOG.info("BKBookieHandleNotAvailableException while"
- + " opening ledger for replication."
- + " Enough Bookies might not have available"
- + "So, no harm to continue");
- underreplicationManager
- .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ } catch (BKNotEnoughBookiesException e) {
+ logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
+ throw e;
+ } catch (BKException e) {
+ logBKExceptionAndReleaseLedger(e, ledgerIdToReplicate);
return false;
}
}
+ private void logBKExceptionAndReleaseLedger(BKException e, long ledgerIdToReplicate)
+ throws UnavailableException {
+ LOG.info("{} while"
+ + " rereplicating ledger {}."
+ + " Enough Bookies might not have available"
+ + " So, no harm to continue",
+ e.getClass().getSimpleName(),
+ ledgerIdToReplicate);
+ underreplicationManager
+ .releaseUnderreplicatedLedger(ledgerIdToReplicate);
+ getExceptionCounter(e.getClass().getSimpleName()).inc();
+ }
+
/**
* When checking the fragments of a ledger, there is a corner case
* where if the last segment/ensemble is open, but nothing has been written to
@@ -471,21 +445,6 @@ public class ReplicationWorker implements Runnable {
return workerRunning && workerThread.isAlive();
}
- boolean isInReadOnlyMode() {
- return isInReadOnlyMode;
- }
-
- private boolean isTargetBookieExistsInFragmentEnsemble(LedgerHandle lh,
- LedgerFragment ledgerFragment) {
- List<BookieSocketAddress> ensemble = ledgerFragment.getEnsemble();
- for (BookieSocketAddress bkAddr : ensemble) {
- if (targetBookie.equals(bkAddr)) {
- return true;
- }
- }
- return false;
- }
-
/** Ledger checker call back */
private static class CheckerCallback implements
GenericCallback<Set<LedgerFragment>> {
@@ -508,4 +467,13 @@ public class ReplicationWorker implements Runnable {
}
}
+ private Counter getExceptionCounter(String name) {
+ Counter counter = this.exceptionCounters.get(name);
+ if (counter == null) {
+ counter = this.statsLogger.scope(REPLICATE_EXCEPTION).getCounter(name);
+ this.exceptionCounters.put(name, counter);
+ }
+ return counter;
+ }
+
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 183af5b..f415152 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -20,6 +20,14 @@
*/
package org.apache.bookkeeper.client;
+import com.google.common.util.concurrent.SettableFuture;
+import io.netty.buffer.ByteBuf;
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -37,17 +45,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.SettableFuture;
-
-import io.netty.buffer.ByteBuf;
-
-import java.io.IOException;
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.junit.Assert.*;
/**
@@ -518,7 +515,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
try {
bkadmin.replicateLedgerFragment(lh3,
- checkercb.getResult(10, TimeUnit.SECONDS).iterator().next(), newBookie);
+ checkercb.getResult(10, TimeUnit.SECONDS).iterator().next());
fail("Shouldn't be able to replicate with a closed client");
} catch (BKException.BKClientClosedException cce) {
// correct behaviour
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index fc78d8f..a9ccf89 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -322,13 +322,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
// Call the async recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
- BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
- newBookiePort);
- LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
- + bookieDest + ")");
// Initiate the sync object
sync.value = false;
- bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+ bkAdmin.asyncRecoverBookieData(bookieSrc, bookieRecoverCb, sync);
// Wait for the async method to complete.
synchronized (sync) {
@@ -380,12 +376,11 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
// Call the async recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
- BookieSocketAddress bookieDest = null;
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ ") and replicate it to a random available one");
// Initiate the sync object
sync.value = false;
- bkAdmin.asyncRecoverBookieData(bookieSrc, bookieDest, bookieRecoverCb, sync);
+ bkAdmin.asyncRecoverBookieData(bookieSrc, bookieRecoverCb, sync);
// Wait for the async method to complete.
synchronized (sync) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
index 83ec593..3bcac7d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerChecker.java
@@ -91,8 +91,13 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
LOG.info("unreplicated fragment: {}", r);
}
assertEquals("Should have one missing fragment", 1, result.size());
- assertEquals("Fragment should be missing from first replica", result
- .iterator().next().getAddress(), replicaToKill);
+ assertEquals("There should be 1 fragments. But returned fragments are "
+ + result, 1, result.size());
+ LedgerFragment lf = result.iterator().next();
+ assertEquals("There should be 1 failed bookies in first fragment " + lf,
+ 1, lf.getBookiesIndexes().size());
+ assertEquals("Fragment should be missing from first replica",
+ lf.getAddress(0), replicaToKill);
BookieSocketAddress replicaToKill2 = lh.getLedgerMetadata()
.getEnsembles().get(0L).get(1);
@@ -104,7 +109,16 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
for (LedgerFragment r : result) {
LOG.info("unreplicated fragment: {}", r);
}
- assertEquals("Should have three missing fragments", 3, result.size());
+ assertEquals("Should have two missing fragments", 2, result.size());
+ for (LedgerFragment fragment : result) {
+ if (fragment.getFirstEntryId() == 0L) {
+ assertEquals("There should be 2 failed bookies in first fragment",
+ 2, fragment.getBookiesIndexes().size());
+ } else {
+ assertEquals("There should be 1 failed bookies in second fragment",
+ 1, fragment.getBookiesIndexes().size());
+ }
+ }
}
/**
@@ -191,7 +205,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
LOG.info("unreplicated fragment: {}", r);
}
- assertEquals("There should be 2 fragments", 2, result.size());
+ assertEquals("There should be 1 fragments", 1, result.size());
+ assertEquals("There should be 2 failed bookies in the fragment",
+ 2, result.iterator().next().getBookiesIndexes().size());
}
/**
@@ -264,7 +280,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
LOG.info("unreplicated fragment: {}", r);
}
- assertEquals("There should be 3 fragments", 3, result.size());
+ assertEquals("There should be 1 fragments", 1, result.size());
+ assertEquals("There should be 3 failed bookies in the fragment",
+ 3, result.iterator().next().getBookiesIndexes().size());
}
/**
@@ -297,7 +315,9 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
}
Set<LedgerFragment> result = getUnderReplicatedFragments(lh);
assertNotNull("Result shouldn't be null", result);
- assertEquals("There should be 2 fragments.", 2, result.size());
+ assertEquals("There should be 1 fragments.", 1, result.size());
+ assertEquals("There should be 2 failed bookies in the fragment",
+ 2, result.iterator().next().getBookiesIndexes().size());
}
/**
@@ -326,6 +346,8 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
assertNotNull("Result shouldn't be null", result);
assertEquals("There should be 1 fragment. But returned fragments are "
+ result, 1, result.size());
+ assertEquals("There should be 1 failed bookies in the fragment",
+ 1, result.iterator().next().getBookiesIndexes().size());
}
/**
@@ -364,8 +386,17 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
Set<LedgerFragment> result = getUnderReplicatedFragments(lh1);
assertNotNull("Result shouldn't be null", result);
- assertEquals("There should be 3 fragment. But returned fragments are "
- + result, 3, result.size());
+ assertEquals("There should be 2 fragments. But returned fragments are "
+ + result, 2, result.size());
+ for (LedgerFragment lf : result) {
+ if (lf.getFirstEntryId() == 0L) {
+ assertEquals("There should be 2 failed bookies in first fragment",
+ 2, lf.getBookiesIndexes().size());
+ } else {
+ assertEquals("There should be 1 failed bookie in second fragment",
+ 1, lf.getBookiesIndexes().size());
+ }
+ }
}
/**
@@ -439,6 +470,8 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
assertNotNull("Result shouldn't be null", result);
assertEquals("There should be 1 fragment. But returned fragments are "
+ result, 1, result.size());
+ assertEquals("There should be 1 failed bookies in the fragment",
+ 1, result.iterator().next().getBookiesIndexes().size());
lh1.close();
// kill bookie 0
@@ -454,8 +487,10 @@ public class TestLedgerChecker extends BookKeeperClusterTestCase {
result = getUnderReplicatedFragments(lh1);
assertNotNull("Result shouldn't be null", result);
- assertEquals("There should be 2 fragment. But returned fragments are "
- + result, 2, result.size());
+ assertEquals("There should be 1 fragment. But returned fragments are "
+ + result, 1, result.size());
+ assertEquals("There should be 2 failed bookies in the fragment",
+ 2, result.iterator().next().getBookiesIndexes().size());
lh1.close();
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
index c0ff8d7..e2d85e0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestLedgerFragmentReplication.java
@@ -19,6 +19,7 @@
*/
package org.apache.bookkeeper.client;
+import com.google.common.collect.Sets;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Enumeration;
@@ -26,7 +27,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CountDownLatch;
-
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
@@ -104,7 +104,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
// 0-9 entries should be copy to new bookie
for (LedgerFragment lf : result) {
- admin.replicateLedgerFragment(lh, lf, newBkAddr);
+ admin.replicateLedgerFragment(lh, lf);
}
// Killing all bookies except newly replicated bookie
@@ -170,11 +170,11 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
int unclosedCount = 0;
for (LedgerFragment lf : result) {
if (lf.isClosed()) {
- admin.replicateLedgerFragment(lh, lf, newBkAddr);
+ admin.replicateLedgerFragment(lh, lf);
} else {
unclosedCount++;
try {
- admin.replicateLedgerFragment(lh, lf, newBkAddr);
+ admin.replicateLedgerFragment(lh, lf);
fail("Shouldn't be able to rereplicate unclosed ledger");
} catch (BKException bke) {
// correct behaviour
@@ -216,12 +216,9 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
Set<LedgerFragment> fragments = getFragmentsToReplicate(lh);
BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
- int startNewBookie = startNewBookie();
- BookieSocketAddress additionalBK = new BookieSocketAddress(InetAddress
- .getLocalHost().getHostAddress(), startNewBookie);
for (LedgerFragment lf : fragments) {
try {
- admin.replicateLedgerFragment(lh, lf, additionalBK);
+ admin.replicateLedgerFragment(lh, lf);
} catch (BKException.BKLedgerRecoveryException e) {
// expected
}
@@ -265,7 +262,7 @@ public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
final long oriFragmentLastEntry, long entriesPerSubFragment,
long expectedSubFragments, LedgerHandle lh) {
LedgerFragment fr = new LedgerFragment(lh, oriFragmentFirstEntry,
- oriFragmentLastEntry, 0) {
+ oriFragmentLastEntry, Sets.newHashSet(0)) {
@Override
public long getLastStoredEntryId() {
return oriFragmentLastEntry;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index d273dab..2ce591b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -79,6 +79,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
baseConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
baseConf.setOpenLedgerRereplicationGracePeriod(openLedgerRereplicationGracePeriod);
+ baseConf.setRwRereplicateBackoffMs(500);
baseClientConf.setLedgerManagerFactoryClassName(
"org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
this.digestType = DigestType.MAC;
@@ -550,8 +551,7 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
}
- private int getReplicaIndexInLedger(LedgerHandle lh,
- BookieSocketAddress replicaToKill) {
+ private int getReplicaIndexInLedger(LedgerHandle lh, BookieSocketAddress replicaToKill) {
SortedMap<Long, ArrayList<BookieSocketAddress>> ensembles = LedgerHandleAdapter
.getLedgerMetadata(lh).getEnsembles();
int ledgerReplicaIndex = -1;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 0ec3bb0..e873f15 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNull;
import java.net.InetAddress;
import java.util.ArrayList;
@@ -39,7 +38,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -132,7 +130,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();
try {
@@ -179,7 +177,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
LOG.info("New Bookie addr :" + newBkAddr);
killAllBookies(lh, newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();
try {
@@ -231,7 +229,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
BookieSocketAddress newBkAddr1 = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie1);
LOG.info("New Bookie addr :" + newBkAddr1);
- ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf, newBkAddr1);
+ ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);
// Starte RW2
int startNewBookie2 = startNewBookie();
@@ -242,8 +240,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(10000)
.build();
- ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf,
- newBkAddr2);
+ ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf);
rw1.start();
rw2.start();
@@ -296,7 +293,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();
try {
@@ -354,7 +351,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();
try {
@@ -413,7 +410,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
// set to 3s instead of default 30s
baseConf.setOpenLedgerRereplicationGracePeriod("3000");
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
LedgerManagerFactory mFactory = LedgerManagerFactory
.newLedgerManagerFactory(baseClientConf, zkc);
@@ -474,7 +471,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
+ ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
LedgerManagerFactory mFactory = LedgerManagerFactory
.newLedgerManagerFactory(baseClientConf, zkc);
@@ -509,49 +506,6 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
}
/**
- * Test that if the local bookie turns out to be read-only, then the replicator will pause but not shutdown.
- */
- @Test
- public void testRWOnLocalBookieReadonlyTransition() throws Exception {
- LedgerHandle lh = bkc.createLedger(3, 3, BookKeeper.DigestType.CRC32, TESTPASSWD);
-
- for (int i = 0; i < 10; i++) {
- lh.addEntry(data);
- }
- BookieSocketAddress replicaToKill =
- LedgerHandleAdapter.getLedgerMetadata(lh).getEnsembles().get(0L).get(0);
-
- LOG.info("Killing Bookie", replicaToKill);
- killBookie(replicaToKill);
-
- int newBkPort = startNewBookie();
- for (int i = 0; i < 10; i++) {
- lh.addEntry(data);
- }
-
- BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), newBkPort);
- LOG.info("New Bookie addr :" + newBkAddr);
-
- ReplicationWorker rw = new ReplicationWorker(zkc, baseConf, newBkAddr);
-
- rw.start();
- try {
- BookieServer newBk = bs.get(bs.size() - 1);
- bsConfs.get(bsConfs.size() - 1).setReadOnlyModeEnabled(true);
- newBk.getBookie().doTransitionToReadOnlyMode();
- underReplicationManager.markLedgerUnderreplicated(lh.getId(), replicaToKill.toString());
- while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(), basePath) && rw.isRunning()
- && !rw.isInReadOnlyMode()) {
- Thread.sleep(100);
- }
- assertNull(zkc.exists(String.format("%s/urL%010d", baseLockPath, lh.getId()), false));
- assertTrue("RW should continue even if the bookie is readonly", rw.isRunning());
- } finally {
- rw.shutdown();
- }
- }
-
- /**
* Test that the replication worker will not shutdown on a simple ZK disconnection
*/
@Test
@@ -562,7 +516,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
.build();
try {
- ReplicationWorker rw = new ReplicationWorker(zk, baseConf, getBookie(0));
+ ReplicationWorker rw = new ReplicationWorker(zk, baseConf);
rw.start();
for (int i = 0; i < 10; i++) {
if (rw.isRunning()) {
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].