You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2018/07/24 11:10:11 UTC

[bookkeeper] branch master updated: Provide BookieId option for listledgers command

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ddbaac  Provide BookieId option for listledgers command
3ddbaac is described below

commit 3ddbaace58a2b95304d7b05e5c86afb9f509c067
Author: cguttapalem <cg...@salesforce.com>
AuthorDate: Tue Jul 24 13:10:04 2018 +0200

    Provide BookieId option for listledgers command
    
    Descriptions of the changes in this PR:
    
    ### Motivation
    
    While diagnosing/analyzing/debugging cluster, sometimes it would be helpful to know the list of the ledgers residing in a particular bookie. Currently there is no simple way (BookieShell command) to find out that.
    
    ### Changes
    
    For listledgers BookieShell command, provide bookieid option, which helps us in knowing list of ledgers residing in this particular Bookie.
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>, Sijie Guo <si...@apache.org>
    
    This closes #1555 from reddycharan/printledgersofbookie
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  | 109 ++++++++++++++-------
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  35 ++++---
 2 files changed, 94 insertions(+), 50 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d500e71..c4ead36 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -66,9 +66,11 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.LongStream;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
@@ -93,14 +95,13 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -134,6 +135,8 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -970,43 +973,78 @@ public class BookieShell implements Tool {
         ListLedgersCmd() {
             super(CMD_LISTLEDGERS);
             lOpts.addOption("m", "meta", false, "Print metadata");
-
+            lOpts.addOption("bookieid", true, "List ledgers residing in this bookie");
         }
 
         @Override
         public int runCmd(CommandLine cmdLine) throws Exception {
+            final boolean printMeta = cmdLine.hasOption("m");
+            final String bookieidToBePartOfEnsemble = cmdLine.getOptionValue("bookieid");
+            final BookieSocketAddress bookieAddress = StringUtils.isBlank(bookieidToBePartOfEnsemble) ? null
+                    : new BookieSocketAddress(bookieidToBePartOfEnsemble);
+
             runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
-                try (LedgerManager m = mFactory.newLedgerManager()) {
-                    LedgerRangeIterator iter = m.getLedgerRanges();
-                    if (cmdLine.hasOption("m")) {
-                        List<ReadMetadataCallback> futures = new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
-                        while (iter.hasNext()) {
-                            LedgerRange r = iter.next();
-                            for (Long lid : r.getLedgers()) {
-                                ReadMetadataCallback cb = new ReadMetadataCallback(lid);
-                                m.readLedgerMetadata(lid, cb);
-                                futures.add(cb);
-                            }
-                            if (futures.size() >= LIST_BATCH_SIZE) {
-                                while (futures.size() > 0) {
-                                    ReadMetadataCallback cb = futures.remove(0);
-                                    printLedgerMetadata(cb);
-                                }
+                try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+
+                    final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+                    final CountDownLatch processDone = new CountDownLatch(1);
+
+                    Processor<Long> ledgerProcessor = new Processor<Long>() {
+                        @Override
+                        public void process(Long ledgerId, VoidCallback cb) {
+                            if (!printMeta && (bookieAddress == null)) {
+                                printLedgerMetadata(ledgerId, null, false);
+                                cb.processResult(BKException.Code.OK, null, null);
+                            } else {
+                                GenericCallback<LedgerMetadata> gencb = new GenericCallback<LedgerMetadata>() {
+                                    @Override
+                                    public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
+                                        if (rc == BKException.Code.OK) {
+                                            if ((bookieAddress == null)
+                                                    || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie(ledgerId,
+                                                            bookieAddress, ledgerMetadata)) {
+                                                /*
+                                                 * the print method has to be in
+                                                 * synchronized scope, otherwise
+                                                 * output of printLedgerMetadata
+                                                 * could interleave since this
+                                                 * callback for different
+                                                 * ledgers can happen in
+                                                 * different threads.
+                                                 */
+                                                synchronized (BookieShell.this) {
+                                                    printLedgerMetadata(ledgerId, ledgerMetadata, printMeta);
+                                                }
+                                            }
+                                        } else if (rc == BKException.Code.NoSuchLedgerExistsException) {
+                                            rc = BKException.Code.OK;
+                                        } else {
+                                            LOG.error("Unable to read the ledger: " + ledgerId + " information");
+                                        }
+                                        cb.processResult(rc, null, null);
+                                    }
+                                };
+                                ledgerManager.readLedgerMetadata(ledgerId, gencb);
                             }
                         }
-                        while (futures.size() > 0) {
-                            ReadMetadataCallback cb = futures.remove(0);
-                            printLedgerMetadata(cb);
-                        }
-                    } else {
-                        while (iter.hasNext()) {
-                            LedgerRange r = iter.next();
-                            for (Long lid : r.getLedgers()) {
-                                System.out.println(ledgerIdFormatter.formatLedgerId(lid));
-                            }
+                    };
+
+                    ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
+                        @Override
+                        public void processResult(int rc, String s, Object obj) {
+                            returnCode.set(rc);
+                            processDone.countDown();
                         }
+                    }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+                    processDone.await();
+                    if (returnCode.get() != BKException.Code.OK) {
+                        LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
+                        throw BKException.create(returnCode.get());
                     }
+
                 } catch (Exception ioe) {
+                    LOG.error("Received Exception while processing ledgers", ioe);
                     throw new UncheckedExecutionException(ioe);
                 }
                 return null;
@@ -1022,7 +1060,7 @@ public class BookieShell implements Tool {
 
         @Override
         String getUsage() {
-            return "listledgers  [-meta]";
+            return "listledgers  [-meta] [-bookieid <bookieaddress>]";
         }
 
         @Override
@@ -1031,10 +1069,11 @@ public class BookieShell implements Tool {
         }
     }
 
-    void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
-        LedgerMetadata md = cb.get();
-        System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(cb.getLedgerId()));
-        System.out.println(new String(md.serialize(), UTF_8));
+    void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
+        System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
+        if (printMeta) {
+            System.out.println(new String(md.serialize(), UTF_8));
+        }
     }
 
     static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
@@ -1082,7 +1121,7 @@ public class BookieShell implements Tool {
                 try (LedgerManager m = mFactory.newLedgerManager()) {
                     ReadMetadataCallback cb = new ReadMetadataCallback(lid);
                     m.readLedgerMetadata(lid, cb);
-                    printLedgerMetadata(cb);
+                    printLedgerMetadata(lid, cb.get(), true);
                 } catch (Exception e) {
                     throw new UncheckedExecutionException(e);
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 63f1e5d..a55ace8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1520,25 +1520,13 @@ public class BookKeeperAdmin implements AutoCloseable {
         }
     }
 
-    private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+    public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
             LedgerManager ledgerManager) {
         ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId);
         ledgerManager.readLedgerMetadata(ledgerId, cb);
         try {
             LedgerMetadata ledgerMetadata = cb.get();
-            Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
-            Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
-            ArrayList<BookieSocketAddress> ensemble;
-            int segmentNo = 0;
-            while (ensemblesOfSegmentsIterator.hasNext()) {
-                ensemble = ensemblesOfSegmentsIterator.next();
-                if (ensemble.contains(bookieAddress)) {
-                    if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
-                        return true;
-                    }
-                }
-            }
-            return false;
+            return areEntriesOfLedgerStoredInTheBookie(ledgerId, bookieAddress, ledgerMetadata);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(ie);
@@ -1554,7 +1542,24 @@ public class BookKeeperAdmin implements AutoCloseable {
         }
     }
 
-    private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
+    public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+            LedgerMetadata ledgerMetadata) {
+        Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
+        Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+        ArrayList<BookieSocketAddress> ensemble;
+        int segmentNo = 0;
+        while (ensemblesOfSegmentsIterator.hasNext()) {
+            ensemble = ensemblesOfSegmentsIterator.next();
+            if (ensemble.contains(bookieAddress)) {
+                if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
             BookieSocketAddress bookieAddress, int segmentNo) {
         boolean isLedgerClosed = ledgerMetadata.isClosed();
         int ensembleSize = ledgerMetadata.getEnsembleSize();