You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2019/03/25 02:50:17 UTC
[bookkeeper] branch master updated: Issue #1987: Migrate command
`convert-to-interleaved-storage`
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4de5983 Issue #1987: Migrate command `convert-to-interleaved-storage`
4de5983 is described below
commit 4de5983793707e764c8dc604cfb6308748664677
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Mar 25 10:50:13 2019 +0800
Issue #1987: Migrate command `convert-to-interleaved-storage`
Descriptions of the changes in this PR:
- Migrate command `convert-to-interleaved-storage`
### Motivation
- #1987
- Use `bkctl` to run command `convert-to-interleaved-storage`
### Changes
- Add command in `bookiegroup`
Reviewers: Jia Zhai <zh...@apache.org>, Sijie Guo <si...@apache.org>
This closes #1988 from zymap/command-ctis, closes #1987
---
.../org/apache/bookkeeper/bookie/BookieShell.java | 94 +----------
.../bookie/ConvertToInterleavedStorageCommand.java | 183 +++++++++++++++++++++
.../tools/cli/commands/BookieCommandGroup.java | 3 +-
.../ConvertToInterleavedStorageCommandTest.java | 167 +++++++++++++++++++
4 files changed, 358 insertions(+), 89 deletions(-)
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index ded7d14..8ac9c85 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -29,7 +29,6 @@ import com.google.common.collect.Lists;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.EventLoopGroup;
@@ -74,10 +73,8 @@ import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
-import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
-import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.LocationsIndexRebuildOp;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
@@ -108,6 +105,7 @@ import org.apache.bookkeeper.replication.ReplicationException.CompatibilityExcep
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -2070,12 +2068,12 @@ public class BookieShell implements Tool {
for (File dir : ledgerDirectories) {
newCookie.writeToDirectory(dir);
}
- LOG.info("Updated cookie file present in ledgerDirectories {}", ledgerDirectories);
+ LOG.info("Updated cookie file present in ledgerDirectories {}", (Object) ledgerDirectories);
if (ledgerDirectories != indexDirectories) {
for (File dir : indexDirectories) {
newCookie.writeToDirectory(dir);
}
- LOG.info("Updated cookie file present in indexDirectories {}", indexDirectories);
+ LOG.info("Updated cookie file present in indexDirectories {}", (Object) indexDirectories);
}
}
// writes newcookie to zookeeper
@@ -2584,89 +2582,9 @@ public class BookieShell implements Tool {
@Override
int runCmd(CommandLine cmdLine) throws Exception {
- LOG.info("=== Converting DbLedgerStorage ===");
- ServerConfiguration conf = new ServerConfiguration(bkConf);
- LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
- new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
- LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
- new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
-
- DbLedgerStorage dbStorage = new DbLedgerStorage();
- InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
-
- CheckpointSource checkpointSource = new CheckpointSource() {
- @Override
- public Checkpoint newCheckpoint() {
- return Checkpoint.MAX;
- }
-
- @Override
- public void checkpointComplete(Checkpoint checkpoint, boolean compact)
- throws IOException {
- }
- };
- Checkpointer checkpointer = new Checkpointer() {
- @Override
- public void startCheckpoint(Checkpoint checkpoint) {
- // No-op
- }
-
- @Override
- public void start() {
- // no-op
- }
- };
-
- dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
- checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
- interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
- null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
- LedgerCache interleavedLedgerCache = interleavedStorage.ledgerCache;
-
- int convertedLedgers = 0;
- for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId));
- }
-
- interleavedStorage.setMasterKey(ledgerId, dbStorage.readMasterKey(ledgerId));
- if (dbStorage.isFenced(ledgerId)) {
- interleavedStorage.setFenced(ledgerId);
- }
-
- long lastEntryInLedger = dbStorage.getLastEntryInLedger(ledgerId);
- for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
- try {
- long location = dbStorage.getLocation(ledgerId, entryId);
- if (location != 0L) {
- interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
- }
- } catch (Bookie.NoEntryException e) {
- // Ignore entry
- }
- }
-
- if (++convertedLedgers % 1000 == 0) {
- LOG.info("Converted {} ledgers", convertedLedgers);
- }
- }
-
- dbStorage.shutdown();
-
- interleavedLedgerCache.flushLedger(true);
- interleavedStorage.flush();
- interleavedStorage.shutdown();
-
- String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
-
- // Rename databases and keep backup
- Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
- FileSystems.getDefault().getPath(baseDir, "ledgers.backup"));
-
- Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
- FileSystems.getDefault().getPath(baseDir, "locations.backup"));
-
- LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+ ConvertToInterleavedStorageCommand cmd = new ConvertToInterleavedStorageCommand();
+ ConvertToInterleavedStorageCommand.CTISFlags flags = new ConvertToInterleavedStorageCommand.CTISFlags();
+ cmd.apply(bkConf, flags);
return 0;
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommand.java
new file mode 100644
index 0000000..8b1183c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommand.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
+import org.apache.bookkeeper.tools.framework.CliFlags;
+import org.apache.bookkeeper.tools.framework.CliSpec;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.LedgerIdFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A command to convert bookie indexes from DbLedgerStorage to InterleavedStorage format.
+ */
+public class ConvertToInterleavedStorageCommand extends BookieCommand<ConvertToInterleavedStorageCommand.CTISFlags> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConvertToInterleavedStorageCommand.class);
+ private static final String NAME = "converttointerleavedstorage";
+ private static final String DESC = "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format";
+ private static final String NOT_INIT = "default formatter";
+
+ @Setter
+ private LedgerIdFormatter ledgerIdFormatter;
+
+ public ConvertToInterleavedStorageCommand() {
+ this(new CTISFlags());
+ }
+
+ public ConvertToInterleavedStorageCommand(CTISFlags flags) {
+ super(CliSpec.<CTISFlags>newBuilder().withName(NAME).withDescription(DESC).withFlags(flags).build());
+ }
+
+ /**
+ * Flags for this command.
+ */
+ @Accessors(fluent = true)
+ public static class CTISFlags extends CliFlags{
+
+ @Parameter(names = { "-l", "--ledgeridformatter" }, description = "Set ledger id formatter")
+ private String ledgerIdFormatter = NOT_INIT;
+
+ }
+
+ @Override
+ public boolean apply(ServerConfiguration conf, CTISFlags cmdFlags) {
+ initLedgerIdFormatter(conf, cmdFlags);
+ try {
+ return handle(conf);
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+
+ private boolean handle(ServerConfiguration bkConf) throws Exception {
+ LOG.info("=== Converting DbLedgerStorage ===");
+ ServerConfiguration conf = new ServerConfiguration(bkConf);
+ LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+ LedgerDirsManager ledgerIndexManager = new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+ new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()));
+
+ DbLedgerStorage dbStorage = new DbLedgerStorage();
+ InterleavedLedgerStorage interleavedStorage = new InterleavedLedgerStorage();
+
+ CheckpointSource checkpointSource = new CheckpointSource() {
+ @Override
+ public Checkpoint newCheckpoint() {
+ return Checkpoint.MAX;
+ }
+
+ @Override
+ public void checkpointComplete(Checkpoint checkpoint, boolean compact) {}
+ };
+ Checkpointer checkpointer = new Checkpointer() {
+ @Override
+ public void startCheckpoint(CheckpointSource.Checkpoint checkpoint) {
+ // No-op
+ }
+
+ @Override
+ public void start() {
+ // no-op
+ }
+ };
+
+ dbStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager, null,
+ checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
+ interleavedStorage.initialize(conf, null, ledgerDirsManager, ledgerIndexManager,
+ null, checkpointSource, checkpointer, NullStatsLogger.INSTANCE, PooledByteBufAllocator.DEFAULT);
+ LedgerCache interleavedLedgerCache = interleavedStorage.getLedgerCache();
+
+ int convertedLedgers = 0;
+ for (long ledgerId : dbStorage.getActiveLedgersInRange(0, Long.MAX_VALUE)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Converting ledger {}", ledgerIdFormatter.formatLedgerId(ledgerId));
+ }
+
+ interleavedStorage.setMasterKey(ledgerId, dbStorage.readMasterKey(ledgerId));
+ if (dbStorage.isFenced(ledgerId)) {
+ interleavedStorage.setFenced(ledgerId);
+ }
+
+ long lastEntryInLedger = dbStorage.getLastEntryInLedger(ledgerId);
+ for (long entryId = 0; entryId <= lastEntryInLedger; entryId++) {
+ try {
+ long location = dbStorage.getLocation(ledgerId, entryId);
+ if (location != 0L) {
+ interleavedLedgerCache.putEntryOffset(ledgerId, entryId, location);
+ }
+ } catch (Bookie.NoEntryException e) {
+ // Ignore entry
+ }
+ }
+
+ if (++convertedLedgers % 1000 == 0) {
+ LOG.info("Converted {} ledgers", convertedLedgers);
+ }
+ }
+
+ dbStorage.shutdown();
+
+ interleavedLedgerCache.flushLedger(true);
+ interleavedStorage.flush();
+ interleavedStorage.shutdown();
+
+ String baseDir = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
+
+ // Rename databases and keep backup
+ Files.move(FileSystems.getDefault().getPath(baseDir, "ledgers"),
+ FileSystems.getDefault().getPath(baseDir, "ledgers.backup"));
+
+ Files.move(FileSystems.getDefault().getPath(baseDir, "locations"),
+ FileSystems.getDefault().getPath(baseDir, "locations.backup"));
+
+ LOG.info("---- Done Converting {} ledgers ----", convertedLedgers);
+ return true;
+ }
+
+ private void initLedgerIdFormatter(ServerConfiguration conf, CTISFlags flags) {
+ if (this.ledgerIdFormatter != null) {
+ return;
+ }
+ if (flags.ledgerIdFormatter.equals(NOT_INIT)) {
+ this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(conf);
+ } else {
+ this.ledgerIdFormatter = LedgerIdFormatter.newLedgerIdFormatter(flags.ledgerIdFormatter, conf);
+ }
+ }
+}
diff --git a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
index 86129a4..9b22772 100644
--- a/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
+++ b/tools/ledger/src/main/java/org/apache/bookkeeper/tools/cli/commands/BookieCommandGroup.java
@@ -22,6 +22,7 @@ import static org.apache.bookkeeper.tools.common.BKCommandCategories.CATEGORY_IN
import org.apache.bookkeeper.tools.cli.BKCtl;
import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToDBStorageCommand;
+import org.apache.bookkeeper.tools.cli.commands.bookie.ConvertToInterleavedStorageCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.FormatCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.InitCommand;
import org.apache.bookkeeper.tools.cli.commands.bookie.LastMarkCommand;
@@ -50,7 +51,7 @@ public class BookieCommandGroup extends CliCommandGroup<BKFlags> {
.addCommand(new SanityTestCommand())
.addCommand(new LedgerCommand())
.addCommand(new ConvertToDBStorageCommand())
- .build();
+ .addCommand(new ConvertToInterleavedStorageCommand()).build();
public BookieCommandGroup() {
super(spec);
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommandTest.java
new file mode 100644
index 0000000..6849ed1
--- /dev/null
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/bookie/ConvertToInterleavedStorageCommandTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.tools.cli.commands.bookie;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.verifyNew;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import io.netty.buffer.PooledByteBufAllocator;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Vector;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.Checkpointer;
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerCache;
+import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
+import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.tools.cli.helpers.BookieCommandTestBase;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Unit test for {@link ConvertToInterleavedStorageCommand}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ ConvertToInterleavedStorageCommand.class, LedgerCache.class })
+public class ConvertToInterleavedStorageCommandTest extends BookieCommandTestBase {
+
+ private LedgerDirsManager ledgerDirsManager;
+ private DbLedgerStorage dbStorage;
+ private InterleavedLedgerStorage interleavedLedgerStorage;
+ private LedgerCache interleavedLedgerCache;
+
+ @Rule
+ private TemporaryFolder folder = new TemporaryFolder();
+
+ public ConvertToInterleavedStorageCommandTest() {
+ super(3, 0);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+
+ createTmpFile();
+ PowerMockito.whenNew(ServerConfiguration.class).withNoArguments().thenReturn(conf);
+ PowerMockito.whenNew(ServerConfiguration.class).withParameterTypes(AbstractConfiguration.class)
+ .withArguments(eq(conf)).thenReturn(conf);
+
+ DiskChecker diskChecker = mock(DiskChecker.class);
+ whenNew(DiskChecker.class).withArguments(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())
+ .thenReturn(diskChecker);
+
+ ledgerDirsManager = mock(LedgerDirsManager.class);
+ whenNew(LedgerDirsManager.class).withParameterTypes(ServerConfiguration.class, File[].class, DiskChecker.class)
+ .withArguments(conf, conf.getLedgerDirs(), diskChecker).thenReturn(ledgerDirsManager);
+ when(ledgerDirsManager.getAllLedgerDirs()).thenReturn(getFileList());
+
+ dbStorage = mock(DbLedgerStorage.class);
+ whenNew(DbLedgerStorage.class).withNoArguments().thenReturn(dbStorage);
+ when(dbStorage.getActiveLedgersInRange(anyLong(), anyLong())).thenReturn(this::getLedgerId);
+
+ interleavedLedgerCache = mock(LedgerCache.class);
+ doNothing().when(interleavedLedgerCache).flushLedger(anyBoolean());
+
+ interleavedLedgerStorage = mock(InterleavedLedgerStorage.class);
+ whenNew(InterleavedLedgerStorage.class).withNoArguments().thenReturn(interleavedLedgerStorage);
+ doNothing().when(interleavedLedgerStorage).flush();
+ doNothing().when(interleavedLedgerStorage).shutdown();
+ when(interleavedLedgerStorage.getLedgerCache()).thenReturn(interleavedLedgerCache);
+ }
+
+ private Iterator<Long> getLedgerId() {
+ Vector<Long> longs = new Vector<>();
+ LongStream.range(0L, 10L).forEach(longs::add);
+ return longs.iterator();
+ }
+
+ private List<File> getFileList() {
+ List<File> files = new LinkedList<>();
+ files.add(folder.getRoot());
+ return files;
+ }
+
+ private void createTmpFile() {
+ try {
+ folder.newFile("ledgers");
+ folder.newFile("locations");
+ System.out.println(folder.getRoot().getAbsolutePath());
+ } catch (IOException e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testConvertToInterleavedStorageCommand() {
+ ConvertToInterleavedStorageCommand cmd = new ConvertToInterleavedStorageCommand();
+ Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
+
+ try {
+ verifyNew(ServerConfiguration.class).withArguments(eq(conf));
+ verifyNew(LedgerDirsManager.class, times(2))
+ .withArguments(eq(conf), any(File[].class), any(DiskChecker.class));
+ verifyNew(DbLedgerStorage.class, times(1)).withNoArguments();
+ verifyNew(InterleavedLedgerStorage.class, times(1)).withNoArguments();
+
+ verify(dbStorage, times(1)).initialize(eq(conf), eq(null), any(LedgerDirsManager.class),
+ any(LedgerDirsManager.class), eq(null), any(CheckpointSource.class),
+ any(Checkpointer.class), eq(NullStatsLogger.INSTANCE), eq(PooledByteBufAllocator.DEFAULT));
+ verify(interleavedLedgerStorage, times(1))
+ .initialize(eq(conf), eq(null), any(LedgerDirsManager.class),
+ any(LedgerDirsManager.class), eq(null), any(CheckpointSource.class),
+ any(Checkpointer.class), eq(NullStatsLogger.INSTANCE), eq(PooledByteBufAllocator.DEFAULT));
+ verify(dbStorage, times(1)).getActiveLedgersInRange(anyLong(), anyLong());
+ verify(dbStorage, times(10)).readMasterKey(anyLong());
+ verify(interleavedLedgerStorage, times(10)).setMasterKey(anyLong(), any());
+ verify(dbStorage, times(10)).getLastEntryInLedger(anyLong());
+ verify(dbStorage, times(10)).getLocation(anyLong(), anyLong());
+ verify(dbStorage, times(1)).shutdown();
+ verify(interleavedLedgerCache, times(1)).flushLedger(true);
+ verify(interleavedLedgerStorage, times(1)).flush();
+ verify(interleavedLedgerStorage, times(1)).shutdown();
+ } catch (Exception e) {
+ throw new UncheckedExecutionException(e.getMessage(), e);
+ }
+ }
+}