You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/01 16:10:44 UTC

[bookkeeper] branch master updated: Migrate command `listledgers`

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b043d16  Migrate command `listledgers`
b043d16 is described below

commit b043d1694569a934d9f27347ec2d15180f7a95c0
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Apr 2 00:10:39 2019 +0800

    Migrate command `listledgers`
    
    Descriptions of the changes in this PR:
    
    Using `bkctl` run command `listledgers`
    
    ### Motivation
    
    #2004
    
    
    Reviewers: Jia Zhai <zh...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #2005 from zymap/command-listledgers
---
 .../org/apache/bookkeeper/bookie/BookieShell.java  |  79 +--------
 .../cli/commands/bookie/ListLedgersCommand.java    | 195 +++++++++++++++++++++
 .../tools/cli/commands/BookieCommandGroup.java     |   2 +
 .../commands/bookie/ListLedgersCommandTest.java    | 105 +++++++++++
 4 files changed, 307 insertions(+), 74 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 8c448a1..9746356 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -60,11 +60,9 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -95,7 +93,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieClientImpl;
 import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.replication.ReplicationException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -109,6 +106,7 @@ import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LedgerCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.ListFilesOnDiscCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.ReadJournalCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
@@ -143,8 +141,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang3.ArrayUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -981,76 +977,11 @@ public class BookieShell implements Tool {
         public int runCmd(CommandLine cmdLine) throws Exception {
             final boolean printMeta = cmdLine.hasOption("m");
             final String bookieidToBePartOfEnsemble = cmdLine.getOptionValue("bookieid");
-            final BookieSocketAddress bookieAddress = StringUtils.isBlank(bookieidToBePartOfEnsemble) ? null
-                    : new BookieSocketAddress(bookieidToBePartOfEnsemble);
 
-            runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
-                try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
-
-                    final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
-                    final CountDownLatch processDone = new CountDownLatch(1);
-
-                    Processor<Long> ledgerProcessor = new Processor<Long>() {
-                            @Override
-                            public void process(Long ledgerId, VoidCallback cb) {
-                                if (!printMeta && (bookieAddress == null)) {
-                                    printLedgerMetadata(ledgerId, null, false);
-                                    cb.processResult(BKException.Code.OK, null, null);
-                                } else {
-                                    ledgerManager.readLedgerMetadata(ledgerId).whenComplete(
-                                            (metadata, exception) -> {
-                                                if (exception == null) {
-                                                    if ((bookieAddress == null)
-                                                        || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie(
-                                                                ledgerId, bookieAddress, metadata.getValue())) {
-                                                        /*
-                                                         * the print method has to be in
-                                                         * synchronized scope, otherwise
-                                                         * output of printLedgerMetadata
-                                                         * could interleave since this
-                                                         * callback for different
-                                                         * ledgers can happen in
-                                                         * different threads.
-                                                         */
-                                                        synchronized (BookieShell.this) {
-                                                            printLedgerMetadata(ledgerId, metadata.getValue(),
-                                                                                printMeta);
-                                                        }
-                                                    }
-                                                    cb.processResult(BKException.Code.OK, null, null);
-                                                } else if (BKException.getExceptionCode(exception)
-                                                           == BKException.Code.NoSuchLedgerExistsException) {
-                                                    cb.processResult(BKException.Code.OK, null, null);
-                                                } else {
-                                                    LOG.error("Unable to read the ledger: {} information", ledgerId);
-                                                    cb.processResult(BKException.getExceptionCode(exception),
-                                                                     null, null);
-                                                }
-                                            });
-                                }
-                            }
-                        };
-
-                    ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
-                        @Override
-                        public void processResult(int rc, String s, Object obj) {
-                            returnCode.set(rc);
-                            processDone.countDown();
-                        }
-                    }, null, BKException.Code.OK, BKException.Code.ReadException);
-
-                    processDone.await();
-                    if (returnCode.get() != BKException.Code.OK) {
-                        LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
-                        throw BKException.create(returnCode.get());
-                    }
-
-                } catch (Exception ioe) {
-                    LOG.error("Received Exception while processing ledgers", ioe);
-                    throw new UncheckedExecutionException(ioe);
-                }
-                return null;
-            });
+            ListLedgersCommand.ListLedgersFlags flags = new ListLedgersCommand.ListLedgersFlags()
+                                                            .bookieId(bookieidToBePartOfEnsemble).meta(printMeta);
+            ListLedgersCommand cmd = new ListLedgersCommand(ledgerIdFormatter);
+            cmd.apply(bkConf, flags);
 
             return 0;
         }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java
new file mode 100644
index 0000000..542216f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand.ListLedgersFlags;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command for list all ledgers on the cluster.
+ */
+public class ListLedgersCommand extends BookieCommand<ListLedgersFlags> {
+
+    static final Logger LOG = LoggerFactory.getLogger(ListLedgersCommand.class);
+
+    private static final String NAME = "listledgers";
+    private static final String DESC = "List all ledgers on the cluster (this may take a long time).";
+    private static final String DEFAULT = "";
+
+    private LedgerIdFormatter ledgerIdFormatter;
+
+    public ListLedgersCommand() {
+        this(new ListLedgersFlags());
+    }
+
+    public ListLedgersCommand(LedgerIdFormatter ledgerIdFormatter) {
+        this(new ListLedgersFlags());
+        this.ledgerIdFormatter = ledgerIdFormatter;
+    }
+
+    public ListLedgersCommand(ListLedgersFlags flags) {
+        super(CliSpec.<ListLedgersFlags>newBuilder()
+                  .withName(NAME)
+                  .withDescription(DESC)
+                  .withFlags(flags)
+                  .build());
+    }
+
+    /**
+     * Flags for ListLedgers command.
+     */
+    @Accessors(fluent = true)
+    @Setter
+    public static class ListLedgersFlags extends CliFlags{
+
+        @Parameter(names = {"-m", "--meta"}, description = "Print metadata")
+        private boolean meta;
+
+        @Parameter(names = { "-id", "--bookieid" }, description = "List ledgers residing in this bookie")
+        private String bookieId;
+
+        @Parameter(names = { "-l", "--ledgerIdFormatter" }, description = "Set ledger id formatter")
+        private String ledgerIdFormatter = DEFAULT;
+    }
+
+    @Override
+    public boolean apply(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
+        initLedgerFrommat(conf, cmdFlags);
+        try {
+            handler(conf, cmdFlags);
+        } catch (UnknownHostException e) {
+            System.err.println("Bookie id error");
+            return false;
+        } catch (MetadataException | ExecutionException e) {
+            throw new UncheckedExecutionException(e.getMessage(), e);
+        }
+
+        return true;
+    }
+
+    private void initLedgerFrommat(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
+        if (ledgerIdFormatter != null) {
+            return;
+        }
+        if (!cmdFlags.ledgerIdFormatter.equals(DEFAULT)) {
+            this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(cmdFlags.ledgerIdFormatter, conf);
+        } else {
+            this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
+        }
+    }
+
+    public boolean handler(ServerConfiguration conf, ListLedgersFlags flags)
+        throws UnknownHostException, MetadataException, ExecutionException {
+
+        final BookieSocketAddress bookieAddress = StringUtils.isBlank(flags.bookieId) ? null :
+                                                      new BookieSocketAddress(flags.bookieId);
+
+        runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+            try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+
+                final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+                final CountDownLatch processDone = new CountDownLatch(1);
+
+                BookkeeperInternalCallbacks.Processor<Long> ledgerProcessor = (ledgerId, cb) -> {
+                    if (!flags.meta && (bookieAddress == null)) {
+                        printLedgerMetadata(ledgerId, null, false);
+                        cb.processResult(BKException.Code.OK, null, null);
+                    } else {
+                        ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> {
+                            if (exception == null) {
+                                if ((bookieAddress == null)
+                                        || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie
+                                               (ledgerId, bookieAddress, metadata.getValue())) {
+                                    /*
+                                     * the print method has to be in
+                                     * synchronized scope, otherwise
+                                     * output of printLedgerMetadata
+                                     * could interleave since this
+                                     * callback for different
+                                     * ledgers can happen in
+                                     * different threads.
+                                     */
+                                    synchronized (ListLedgersCommand.this) {
+                                        printLedgerMetadata(ledgerId, metadata.getValue(), flags.meta);
+                                    }
+                                }
+                                cb.processResult(BKException.Code.OK, null, null);
+                            } else if (BKException.getExceptionCode(exception)
+                                      == BKException.Code.NoSuchLedgerExistsException) {
+                                cb.processResult(BKException.Code.OK, null, null);
+                            } else {
+                                LOG.error("Unable to read the ledger: {} information", ledgerId);
+                                cb.processResult(BKException.getExceptionCode(exception), null, null);
+                            }
+                        });
+                    }
+                };
+
+                ledgerManager.asyncProcessLedgers(ledgerProcessor, (rc, s, obj) -> {
+                    returnCode.set(rc);
+                    processDone.countDown();
+                }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+                processDone.await();
+                if (returnCode.get() != BKException.Code.OK) {
+                    LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
+                    throw BKException.create(returnCode.get());
+                }
+
+            } catch (Exception ioe) {
+                LOG.error("Received Exception while processing ledgers", ioe);
+                throw new UncheckedExecutionException(ioe);
+            }
+            return null;
+        });
+
+        return true;
+    }
+
+    private void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
+        System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
+        if (printMeta) {
+            System.out.println(md.toString());
+        }
+    }
+}
diff --git a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index c8c7817..4967889 100644
--- a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++ b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.LedgerCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.ListFilesOnDiscCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.ReadJournalCommand;
 import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
 import org.apache.bookkeeper.tools.common.BKFlags;
@@ -54,6 +55,7 @@ public class BookieCommandGroup extends CliCommandGroup<BKFlags> {
         .addCommand(new LedgerCommand())
         .addCommand(new ListFilesOnDiscCommand())
         .addCommand(new ConvertToDBStorageCommand())
+        .addCommand(new ListLedgersCommand())
         .addCommand(new ConvertToInterleavedStorageCommand())
         .addCommand(new ReadJournalCommand())
         .build();
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java
new file mode 100644
index 0000000..9470023
--- /dev/null
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.apache.zookeeper.AsyncCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test for ListLedgers command.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ListLedgersCommand.class, MetadataDrivers.class, BookkeeperInternalCallbacks.class,
+    CountDownLatch.class })
+public class ListLedgersCommandTest extends BookieCommandTestBase {
+
+    public ListLedgersCommandTest() {
+        super(3, 3);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+
+        PowerMockito.whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+
+        BookieSocketAddress bookieAddress = mock(BookieSocketAddress.class);
+        PowerMockito.whenNew(BookieSocketAddress.class).withParameterTypes(String.class).withArguments(anyString())
+            .thenReturn(bookieAddress);
+
+        PowerMockito.mockStatic(MetadataDrivers.class);
+        LedgerManagerFactory mFactory = mock(LedgerManagerFactory.class);
+        PowerMockito.doAnswer(invocationOnMock -> {
+            Function<LedgerManagerFactory, ?> function = invocationOnMock.getArgument(1);
+            function.apply(mFactory);
+            return true;
+        }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class),
+                any(Function.class));
+
+        CountDownLatch processDone = mock(CountDownLatch.class);
+        PowerMockito.whenNew(CountDownLatch.class).withArguments(anyInt())
+            .thenReturn(processDone);
+
+        LedgerManager ledgerManager = mock(LedgerManager.class);
+        when(mFactory.newLedgerManager()).thenReturn(ledgerManager);
+
+        AsyncCallback.VoidCallback callback = mock(AsyncCallback.VoidCallback.class);
+        PowerMockito.doAnswer(invocationOnMock -> {
+            processDone.countDown();
+            return null;
+        }).when(callback).processResult(anyInt(), anyString(), any());
+    }
+
+    @Test
+    public void testWithoutBookieId() {
+        testCommand("");
+    }
+
+    @Test
+    public void testWithBookieId() {
+        testCommand("-id", "1");
+    }
+
+    private void testCommand(String... args) {
+        ListLedgersCommand command = new ListLedgersCommand();
+        Assert.assertTrue(command.apply(bkFlags, args));
+    }
+}