You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/07/24 11:10:11 UTC
[bookkeeper] branch master updated: Provide BookieId option for
listledgers command
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 3ddbaac Provide BookieId option for listledgers command
3ddbaac is described below
commit 3ddbaace58a2b95304d7b05e5c86afb9f509c067
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jul 24 13:10:04 2018 +0200
Provide BookieId option for listledgers command
Descriptions of the changes in this PR:
### Motivation
While diagnosing/analyzing/debugging cluster, sometimes it would be helpful to know the list of the ledgers residing in a particular bookie. Currently there is no simple way (BookieShell command) to find out that.
### Changes
For listledgers BookieShell command, provide bookieid option, which helps us in knowing list of ledgers residing in this particular Bookie.
Author: cguttapalem <cg...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
This closes #1555 from reddycharan/printledgersofbookie
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 109 ++++++++++++++-------
.../apache/bookkeeper/client/BookKeeperAdmin.java | 35 ++++---
2 files changed, 94 insertions(+), 50 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d500e71..c4ead36 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -66,9 +66,11 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
@@ -93,14 +95,13 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -134,6 +135,8 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -970,43 +973,78 @@ public class BookieShell implements Tool {
ListLedgersCmd() {
super(CMD_LISTLEDGERS);
lOpts.addOption("m", "meta", false, "Print metadata");
-
+ lOpts.addOption("bookieid", true, "List ledgers residing in this bookie");
}
@Override
public int runCmd(CommandLine cmdLine) throws Exception {
+ final boolean printMeta = cmdLine.hasOption("m");
+ final String bookieidToBePartOfEnsemble = cmdLine.getOptionValue("bookieid");
+ final BookieSocketAddress bookieAddress = StringUtils.isBlank(bookieidToBePartOfEnsemble) ? null
+ : new BookieSocketAddress(bookieidToBePartOfEnsemble);
+
runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
- try (LedgerManager m = mFactory.newLedgerManager()) {
- LedgerRangeIterator iter = m.getLedgerRanges();
- if (cmdLine.hasOption("m")) {
- List<ReadMetadataCallback> futures = new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
- while (iter.hasNext()) {
- LedgerRange r = iter.next();
- for (Long lid : r.getLedgers()) {
- ReadMetadataCallback cb = new ReadMetadataCallback(lid);
- m.readLedgerMetadata(lid, cb);
- futures.add(cb);
- }
- if (futures.size() >= LIST_BATCH_SIZE) {
- while (futures.size() > 0) {
- ReadMetadataCallback cb = futures.remove(0);
- printLedgerMetadata(cb);
- }
+ try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+
+ final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+ final CountDownLatch processDone = new CountDownLatch(1);
+
+ Processor<Long> ledgerProcessor = new Processor<Long>() {
+ @Override
+ public void process(Long ledgerId, VoidCallback cb) {
+ if (!printMeta && (bookieAddress == null)) {
+ printLedgerMetadata(ledgerId, null, false);
+ cb.processResult(BKException.Code.OK, null, null);
+ } else {
+ GenericCallback<LedgerMetadata> gencb = new GenericCallback<LedgerMetadata>() {
+ @Override
+ public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
+ if (rc == BKException.Code.OK) {
+ if ((bookieAddress == null)
+ || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie(ledgerId,
+ bookieAddress, ledgerMetadata)) {
+ /*
+ * the print method has to be in
+ * synchronized scope, otherwise
+ * output of printLedgerMetadata
+ * could interleave since this
+ * callback for different
+ * ledgers can happen in
+ * different threads.
+ */
+ synchronized (BookieShell.this) {
+ printLedgerMetadata(ledgerId, ledgerMetadata, printMeta);
+ }
+ }
+ } else if (rc == BKException.Code.NoSuchLedgerExistsException) {
+ rc = BKException.Code.OK;
+ } else {
+ LOG.error("Unable to read the ledger: " + ledgerId + " information");
+ }
+ cb.processResult(rc, null, null);
+ }
+ };
+ ledgerManager.readLedgerMetadata(ledgerId, gencb);
}
}
- while (futures.size() > 0) {
- ReadMetadataCallback cb = futures.remove(0);
- printLedgerMetadata(cb);
- }
- } else {
- while (iter.hasNext()) {
- LedgerRange r = iter.next();
- for (Long lid : r.getLedgers()) {
- System.out.println(ledgerIdFormatter.formatLedgerId(lid));
- }
+ };
+
+ ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String s, Object obj) {
+ returnCode.set(rc);
+ processDone.countDown();
}
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+ processDone.await();
+ if (returnCode.get() != BKException.Code.OK) {
+ LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
+ throw BKException.create(returnCode.get());
}
+
} catch (Exception ioe) {
+ LOG.error("Received Exception while processing ledgers", ioe);
throw new UncheckedExecutionException(ioe);
}
return null;
@@ -1022,7 +1060,7 @@ public class BookieShell implements Tool {
@Override
String getUsage() {
- return "listledgers [-meta]";
+ return "listledgers [-meta] [-bookieid <bookieaddress>]";
}
@Override
@@ -1031,10 +1069,11 @@ public class BookieShell implements Tool {
}
}
- void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
- LedgerMetadata md = cb.get();
- System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(cb.getLedgerId()));
- System.out.println(new String(md.serialize(), UTF_8));
+ void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
+ System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
+ if (printMeta) {
+ System.out.println(new String(md.serialize(), UTF_8));
+ }
}
static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
@@ -1082,7 +1121,7 @@ public class BookieShell implements Tool {
try (LedgerManager m = mFactory.newLedgerManager()) {
ReadMetadataCallback cb = new ReadMetadataCallback(lid);
m.readLedgerMetadata(lid, cb);
- printLedgerMetadata(cb);
+ printLedgerMetadata(lid, cb.get(), true);
} catch (Exception e) {
throw new UncheckedExecutionException(e);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 63f1e5d..a55ace8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1520,25 +1520,13 @@ public class BookKeeperAdmin implements AutoCloseable {
}
}
- private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+ public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
LedgerManager ledgerManager) {
ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId);
ledgerManager.readLedgerMetadata(ledgerId, cb);
try {
LedgerMetadata ledgerMetadata = cb.get();
- Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
- Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
- ArrayList<BookieSocketAddress> ensemble;
- int segmentNo = 0;
- while (ensemblesOfSegmentsIterator.hasNext()) {
- ensemble = ensemblesOfSegmentsIterator.next();
- if (ensemble.contains(bookieAddress)) {
- if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
- return true;
- }
- }
- }
- return false;
+ return areEntriesOfLedgerStoredInTheBookie(ledgerId, bookieAddress, ledgerMetadata);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
@@ -1554,7 +1542,24 @@ public class BookKeeperAdmin implements AutoCloseable {
}
}
- private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
+ public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+ LedgerMetadata ledgerMetadata) {
+ Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
+ Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+ ArrayList<BookieSocketAddress> ensemble;
+ int segmentNo = 0;
+ while (ensemblesOfSegmentsIterator.hasNext()) {
+ ensemble = ensemblesOfSegmentsIterator.next();
+ if (ensemble.contains(bookieAddress)) {
+ if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
BookieSocketAddress bookieAddress, int segmentNo) {
boolean isLedgerClosed = ledgerMetadata.isClosed();
int ensembleSize = ledgerMetadata.getEnsembleSize();