You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/04/01 16:10:44 UTC
[bookkeeper] branch master updated: Migrate command `listledgers`
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new b043d16 Migrate command `listledgers`
b043d16 is described below
commit b043d1694569a934d9f27347ec2d15180f7a95c0
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Apr 2 00:10:39 2019 +0800
Migrate command `listledgers`
Descriptions of the changes in this PR:
Using `bkctl` run command `listledgers`
### Motivation
#2004
Reviewers: Jia Zhai <zh...@apache.org>, Sijie Guo <si...@apache.org>
This closes #2005 from zymap/command-listledgers
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 79 +--------
.../cli/commands/bookie/ListLedgersCommand.java | 195 +++++++++++++++++++++
.../tools/cli/commands/BookieCommandGroup.java | 2 +
.../commands/bookie/ListLedgersCommandTest.java | 105 +++++++++++
4 files changed, 307 insertions(+), 74 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 8c448a1..9746356 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -60,11 +60,9 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -95,7 +93,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.bookkeeper.proto.BookieProtocol;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.replication.AuditorElector;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -109,6 +106,7 @@ import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListFilesOnDiscCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadJournalCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.cli.commands.bookies.InfoCommand;
@@ -143,8 +141,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
@@ -981,76 +977,11 @@ public class BookieShell implements Tool {
public int runCmd(CommandLine cmdLine) throws Exception {
final boolean printMeta = cmdLine.hasOption("m");
final String bookieidToBePartOfEnsemble = cmdLine.getOptionValue("bookieid");
- final BookieSocketAddress bookieAddress = StringUtils.isBlank(bookieidToBePartOfEnsemble) ? null
- : new BookieSocketAddress(bookieidToBePartOfEnsemble);
- runFunctionWithLedgerManagerFactory(bkConf, mFactory -> {
- try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
-
- final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
- final CountDownLatch processDone = new CountDownLatch(1);
-
- Processor<Long> ledgerProcessor = new Processor<Long>() {
- @Override
- public void process(Long ledgerId, VoidCallback cb) {
- if (!printMeta && (bookieAddress == null)) {
- printLedgerMetadata(ledgerId, null, false);
- cb.processResult(BKException.Code.OK, null, null);
- } else {
- ledgerManager.readLedgerMetadata(ledgerId).whenComplete(
- (metadata, exception) -> {
- if (exception == null) {
- if ((bookieAddress == null)
- || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie(
- ledgerId, bookieAddress, metadata.getValue())) {
- /*
- * the print method has to be in
- * synchronized scope, otherwise
- * output of printLedgerMetadata
- * could interleave since this
- * callback for different
- * ledgers can happen in
- * different threads.
- */
- synchronized (BookieShell.this) {
- printLedgerMetadata(ledgerId, metadata.getValue(),
- printMeta);
- }
- }
- cb.processResult(BKException.Code.OK, null, null);
- } else if (BKException.getExceptionCode(exception)
- == BKException.Code.NoSuchLedgerExistsException) {
- cb.processResult(BKException.Code.OK, null, null);
- } else {
- LOG.error("Unable to read the ledger: {} information", ledgerId);
- cb.processResult(BKException.getExceptionCode(exception),
- null, null);
- }
- });
- }
- }
- };
-
- ledgerManager.asyncProcessLedgers(ledgerProcessor, new AsyncCallback.VoidCallback() {
- @Override
- public void processResult(int rc, String s, Object obj) {
- returnCode.set(rc);
- processDone.countDown();
- }
- }, null, BKException.Code.OK, BKException.Code.ReadException);
-
- processDone.await();
- if (returnCode.get() != BKException.Code.OK) {
- LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
- throw BKException.create(returnCode.get());
- }
-
- } catch (Exception ioe) {
- LOG.error("Received Exception while processing ledgers", ioe);
- throw new UncheckedExecutionException(ioe);
- }
- return null;
- });
+ ListLedgersCommand.ListLedgersFlags flags = new ListLedgersCommand.ListLedgersFlags()
+ .bookieId(bookieidToBePartOfEnsemble).meta(printMeta);
+ ListLedgersCommand cmd = new ListLedgersCommand(ledgerIdFormatter);
+ cmd.apply(bkConf, flags);
return 0;
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java
new file mode 100644
index 0000000..542216f
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommand.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerManagerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import java.net.UnknownHostException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.exceptions.MetadataException;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand.ListLedgersFlags;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command for list all ledgers on the cluster.
+ */
+public class ListLedgersCommand extends BookieCommand<ListLedgersFlags> {
+
+ static final Logger LOG = LoggerFactory.getLogger(ListLedgersCommand.class);
+
+ private static final String NAME = "listledgers";
+ private static final String DESC = "List all ledgers on the cluster (this may take a long time).";
+ private static final String DEFAULT = "";
+
+ private LedgerIdFormatter ledgerIdFormatter;
+
+ public ListLedgersCommand() {
+ this(new ListLedgersFlags());
+ }
+
+ public ListLedgersCommand(LedgerIdFormatter ledgerIdFormatter) {
+ this(new ListLedgersFlags());
+ this.ledgerIdFormatter = ledgerIdFormatter;
+ }
+
+ public ListLedgersCommand(ListLedgersFlags flags) {
+ super(CliSpec.<ListLedgersFlags>newBuilder()
+ .withName(NAME)
+ .withDescription(DESC)
+ .withFlags(flags)
+ .build());
+ }
+
+ /**
+ * Flags for ListLedgers command.
+ */
+ @Accessors(fluent = true)
+ @Setter
+ public static class ListLedgersFlags extends CliFlags{
+
+ @Parameter(names = {"-m", "--meta"}, description = "Print metadata")
+ private boolean meta;
+
+ @Parameter(names = { "-id", "--bookieid" }, description = "List ledgers residing in this bookie")
+ private String bookieId;
+
+ @Parameter(names = { "-l", "--ledgerIdFormatter" }, description = "Set ledger id formatter")
+ private String ledgerIdFormatter = DEFAULT;
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
+ initLedgerFrommat(conf, cmdFlags);
+ try {
+ handler(conf, cmdFlags);
+ } catch (UnknownHostException e) {
+ System.err.println("Bookie id error");
+ return false;
+ } catch (MetadataException | ExecutionException e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+
+ return true;
+ }
+
+ private void initLedgerFrommat(ServerConfiguration conf, ListLedgersFlags cmdFlags) {
+ if (ledgerIdFormatter != null) {
+ return;
+ }
+ if (!cmdFlags.ledgerIdFormatter.equals(DEFAULT)) {
+ this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(cmdFlags.ledgerIdFormatter, conf);
+ } else {
+ this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
+ }
+ }
+
+ public boolean handler(ServerConfiguration conf, ListLedgersFlags flags)
+ throws UnknownHostException, MetadataException, ExecutionException {
+
+ final BookieSocketAddress bookieAddress = StringUtils.isBlank(flags.bookieId) ? null :
+ new BookieSocketAddress(flags.bookieId);
+
+ runFunctionWithLedgerManagerFactory(conf, mFactory -> {
+ try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
+
+ final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
+ final CountDownLatch processDone = new CountDownLatch(1);
+
+ BookkeeperInternalCallbacks.Processor<Long> ledgerProcessor = (ledgerId, cb) -> {
+ if (!flags.meta && (bookieAddress == null)) {
+ printLedgerMetadata(ledgerId, null, false);
+ cb.processResult(BKException.Code.OK, null, null);
+ } else {
+ ledgerManager.readLedgerMetadata(ledgerId).whenComplete((metadata, exception) -> {
+ if (exception == null) {
+ if ((bookieAddress == null)
+ || BookKeeperAdmin.areEntriesOfLedgerStoredInTheBookie
+ (ledgerId, bookieAddress, metadata.getValue())) {
+ /*
+ * the print method has to be in
+ * synchronized scope, otherwise
+ * output of printLedgerMetadata
+ * could interleave since this
+ * callback for different
+ * ledgers can happen in
+ * different threads.
+ */
+ synchronized (ListLedgersCommand.this) {
+ printLedgerMetadata(ledgerId, metadata.getValue(), flags.meta);
+ }
+ }
+ cb.processResult(BKException.Code.OK, null, null);
+ } else if (BKException.getExceptionCode(exception)
+ == BKException.Code.NoSuchLedgerExistsException) {
+ cb.processResult(BKException.Code.OK, null, null);
+ } else {
+ LOG.error("Unable to read the ledger: {} information", ledgerId);
+ cb.processResult(BKException.getExceptionCode(exception), null, null);
+ }
+ });
+ }
+ };
+
+ ledgerManager.asyncProcessLedgers(ledgerProcessor, (rc, s, obj) -> {
+ returnCode.set(rc);
+ processDone.countDown();
+ }, null, BKException.Code.OK, BKException.Code.ReadException);
+
+ processDone.await();
+ if (returnCode.get() != BKException.Code.OK) {
+ LOG.error("Received error return value while processing ledgers: {}", returnCode.get());
+ throw BKException.create(returnCode.get());
+ }
+
+ } catch (Exception ioe) {
+ LOG.error("Received Exception while processing ledgers", ioe);
+ throw new UncheckedExecutionException(ioe);
+ }
+ return null;
+ });
+
+ return true;
+ }
+
+ private void printLedgerMetadata(long ledgerId, LedgerMetadata md, boolean printMeta) {
+ System.out.println("ledgerID: " + ledgerIdFormatter.formatLedgerId(ledgerId));
+ if (printMeta) {
+ System.out.println(md.toString());
+ }
+ }
+}
diff --git a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index c8c7817..4967889 100644
--- a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++ b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LedgerCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ListFilesOnDiscCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ListLedgersCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.ReadJournalCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.SanityTestCommand;
import org.apache.bookkeeper.tools.common.BKFlags;
@@ -54,6 +55,7 @@ public class BookieCommandGroup extends CliCommandGroup<BKFlags> {
.addCommand(new LedgerCommand())
.addCommand(new ListFilesOnDiscCommand())
.addCommand(new ConvertToDBStorageCommand())
+ .addCommand(new ListLedgersCommand())
.addCommand(new ConvertToInterleavedStorageCommand())
.addCommand(new ReadJournalCommand())
.build();
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java
new file mode 100644
index 0000000..9470023
--- /dev/null
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ListLedgersCommandTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Function;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.meta.LedgerManagerFactory;
+import org.apache.bookkeeper.meta.MetadataDrivers;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.apache.zookeeper.AsyncCallback;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test for ListLedgers command.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ListLedgersCommand.class, MetadataDrivers.class, BookkeeperInternalCallbacks.class,
+ CountDownLatch.class })
+public class ListLedgersCommandTest extends BookieCommandTestBase {
+
+ public ListLedgersCommandTest() {
+ super(3, 3);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+
+ PowerMockito.whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+
+ BookieSocketAddress bookieAddress = mock(BookieSocketAddress.class);
+ PowerMockito.whenNew(BookieSocketAddress.class).withParameterTypes(String.class).withArguments(anyString())
+ .thenReturn(bookieAddress);
+
+ PowerMockito.mockStatic(MetadataDrivers.class);
+ LedgerManagerFactory mFactory = mock(LedgerManagerFactory.class);
+ PowerMockito.doAnswer(invocationOnMock -> {
+ Function<LedgerManagerFactory, ?> function = invocationOnMock.getArgument(1);
+ function.apply(mFactory);
+ return true;
+ }).when(MetadataDrivers.class, "runFunctionWithLedgerManagerFactory", any(ServerConfiguration.class),
+ any(Function.class));
+
+ CountDownLatch processDone = mock(CountDownLatch.class);
+ PowerMockito.whenNew(CountDownLatch.class).withArguments(anyInt())
+ .thenReturn(processDone);
+
+ LedgerManager ledgerManager = mock(LedgerManager.class);
+ when(mFactory.newLedgerManager()).thenReturn(ledgerManager);
+
+ AsyncCallback.VoidCallback callback = mock(AsyncCallback.VoidCallback.class);
+ PowerMockito.doAnswer(invocationOnMock -> {
+ processDone.countDown();
+ return null;
+ }).when(callback).processResult(anyInt(), anyString(), any());
+ }
+
+ @Test
+ public void testWithoutBookieId() {
+ testCommand("");
+ }
+
+ @Test
+ public void testWithBookieId() {
+ testCommand("-id", "1");
+ }
+
+ private void testCommand(String... args) {
+ ListLedgersCommand command = new ListLedgersCommand();
+ Assert.assertTrue(command.apply(bkFlags, args));
+ }
+}