You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/07/24 11:10:14 UTC

[GitHub] eolivelli closed pull request #1555: Provide BookieId option for listledgers command

eolivelli closed pull request #1555: Provide BookieId option for listledgers command
URL: https://github.com/apache/bookkeeper/pull/1555
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index d500e710f..c4ead3665 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -66,9 +66,11 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.LongStream;
 import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
@@ -93,14 +95,13 @@
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.meta.LedgerManager;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
-import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -134,6 +135,8 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -970,43 +973,78 @@ int runCmd(CommandLine cmdLine) throws Exception {
         ListLedgersCmd() {
             super(CMD_LISTLEDGERS);
             lOpts.addOption("m", "meta", false, "Print metadata");
-
+            lOpts.addOption("bookieid", true, "List ledgers residing in this bookie");
         }
 
         @Override
         public int runCmd(CommandLine cmdLine) throws Exception {
+            final boolean printMeta = cmdLine.hasOption("m");
+            final String bookieidToBePartOfEnsemble = cmdLine.getOptionValue("bookieid");
+            final BookieSocketAddress bookieAddress = StringUtils.isBlank(bookieidToBePartOfEnsemble) ? null
+                    : new BookieSocketAddress(bookieidToBePartOfEnsemble);
+
             runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
-                try (LedgerManager m = mFactory.newLedgerManager()) {
-                    LedgerRangeIterator iter = m.getLedgerRanges();
-                    if (cmdLine.hasOption("m")) {
-                        List<ReadMetadataCallback> futures = new ArrayList<ReadMetadataCallback>(LIST_BATCH_SIZE);
-                        while (iter.hasNext()) {
-                            LedgerRange r = iter.next();
-                            for (Long lid : r.getLedgers()) {
-                                ReadMetadataCallback cb = new ReadMetadataCallback(lid);
-                                m.readLedgerMetadata(lid, cb);
-                                futures.add(cb);
-                            }
-                            if (futures.size() >= LIST_BATCH_SIZE) {
-                                while (futures.size() > 0) {
-                                    ReadMetadataCallback cb = futures.remove(0);
-                                    printLedgerMetadata(cb);
-                                }
+                try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+
+                    final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+                    final CountDownLatch processDone = new CountDownLatch(1);
+
+                    Processor<Long> ledgerProcessor = new Processor<Long>() {
+                        @Override
+                        public void process(Long ledgerId, VoidCallback cb) {
+                            if (!printMeta && (bookieAddress == null)) {
+                                printLedgerMetadata(ledgerId, null, false);
+                                cb.processResult(BKException.Code.OK, null, null);
+                            } else {
+                                GenericCallback<LedgerMetadata> gencb = new GenericCallback<LedgerMetadata>() {
+                                    @Override
+                                    public void operationComplete(int rc, LedgerMetadata ledgerMetadata) {
+                                        if (rc == BKException.Code.OK) {
+                                            if ((bookieAddress == null)
+                                                    || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie(ledgerId,
+                                                            bookieAddress, ledgerMetadata)) {
+                                                /*
+                                                 * the print method has to be in
+                                                 * synchronized scope, otherwise
+                                                 * output of printLedgerMetadata
+                                                 * could interleave since this
+                                                 * callback for different
+                                                 * ledgers can happen in
+                                                 * different threads.
+                                                 */
+                                                synchronized (BookieShell.this) {
+                                                    printLedgerMetadata(ledgerId, ledgerMetadata, printMeta);
+                                                }
+                                            }
+                                        } else if (rc == BKException.Code.NoSuchLedgerExistsException) {
+                                            rc = BKException.Code.OK;
+                                        } else {
+                                            LOG.error("Unable to read the ledger: " + ledgerId + " information");
+                                        }
+                                        cb.processResult(rc, null, null);
+                                    }
+                                };
+                                ledgerManager.readLedgerMetadata(ledgerId, gencb);
                             }
                         }
-                        while (futures.size() > 0) {
-                            ReadMetadataCallback cb = futures.remove(0);
-                            printLedgerMetadata(cb);
-                        }
-                    } else {
-                        while (iter.hasNext()) {
-                            LedgerRange r = iter.next();
-                            for (Long lid : r.getLedgers()) {
-                                System.out.println(ledgerIdFormatter.formatLedgerId(lid));
-                            }
+                    };
+
+                    ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
+                        @Override
+                        public void processResult(int rc, String s, Object obj) {
+                            returnCode.set(rc);
+                            processDone.countDown();
                         }
+                    }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+                    processDone.await();
+                    if (returnCode.get() != BKException.Code.OK) {
+                        LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
+                        throw BKException.create(returnCode.get());
                     }
+
                 } catch (Exception ioe) {
+                    LOG.error("Received Exception while processing ledgers", ioe);
                     throw new UncheckedExecutionException(ioe);
                 }
                 return null;
@@ -1022,7 +1060,7 @@ String getDescription() {
 
         @Override
         String getUsage() {
-            return "listledgers  [-meta]";
+            return "listledgers  [-meta] [-bookieid <bookieaddress>]";
         }
 
         @Override
@@ -1031,10 +1069,11 @@ Options getOptions() {
         }
     }
 
-    void printLedgerMetadata(ReadMetadataCallback cb) throws Exception {
-        LedgerMetadata md = cb.get();
-        System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(cb.getLedgerId()));
-        System.out.println(new String(md.serialize(), UTF_8));
+    void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
+        System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
+        if (printMeta) {
+            System.out.println(new String(md.serialize(), UTF_8));
+        }
     }
 
     static class ReadMetadataCallback extends AbstractFuture<LedgerMetadata>
@@ -1082,7 +1121,7 @@ public int runCmd(CommandLine cmdLine) throws Exception {
                 try (LedgerManager m = mFactory.newLedgerManager()) {
                     ReadMetadataCallback cb = new ReadMetadataCallback(lid);
                     m.readLedgerMetadata(lid, cb);
-                    printLedgerMetadata(cb);
+                    printLedgerMetadata(lid, cb.get(), true);
                 } catch (Exception e) {
                     throw new UncheckedExecutionException(e);
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 63f1e5d6b..a55ace85f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -1520,25 +1520,13 @@ private void waitForLedgersToBeReplicated(Collection<Long> ledgers, BookieSocket
         }
     }
 
-    private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+    public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
             LedgerManager ledgerManager) {
         ReadMetadataCallback cb = new ReadMetadataCallback(ledgerId);
         ledgerManager.readLedgerMetadata(ledgerId, cb);
         try {
             LedgerMetadata ledgerMetadata = cb.get();
-            Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
-            Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
-            ArrayList<BookieSocketAddress> ensemble;
-            int segmentNo = 0;
-            while (ensemblesOfSegmentsIterator.hasNext()) {
-                ensemble = ensemblesOfSegmentsIterator.next();
-                if (ensemble.contains(bookieAddress)) {
-                    if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
-                        return true;
-                    }
-                }
-            }
-            return false;
+            return areEntriesOfLedgerStoredInTheBookie(ledgerId, bookieAddress, ledgerMetadata);
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(ie);
@@ -1554,7 +1542,24 @@ private boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketA
         }
     }
 
-    private boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
+    public static boolean areEntriesOfLedgerStoredInTheBookie(long ledgerId, BookieSocketAddress bookieAddress,
+            LedgerMetadata ledgerMetadata) {
+        Collection<ArrayList<BookieSocketAddress>> ensemblesOfSegments = ledgerMetadata.getEnsembles().values();
+        Iterator<ArrayList<BookieSocketAddress>> ensemblesOfSegmentsIterator = ensemblesOfSegments.iterator();
+        ArrayList<BookieSocketAddress> ensemble;
+        int segmentNo = 0;
+        while (ensemblesOfSegmentsIterator.hasNext()) {
+            ensemble = ensemblesOfSegmentsIterator.next();
+            if (ensemble.contains(bookieAddress)) {
+                if (areEntriesOfSegmentStoredInTheBookie(ledgerMetadata, bookieAddress, segmentNo++)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static boolean areEntriesOfSegmentStoredInTheBookie(LedgerMetadata ledgerMetadata,
             BookieSocketAddress bookieAddress, int segmentNo) {
         boolean isLedgerClosed = ledgerMetadata.isClosed();
         int ensembleSize = ledgerMetadata.getEnsembleSize();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services