You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by hs...@apache.org on 2021/05/27 23:29:45 UTC

[bookkeeper] branch master updated: Refactor book keeper cluster test case

This is an automated email from the ASF dual-hosted git repository.

hsaputra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f06a4c  Refactor book keeper cluster test case
9f06a4c is described below

commit 9f06a4ce82f61a9dc4b086650a301734332f4246
Author: Prashant Kumar <65...@users.noreply.github.com>
AuthorDate: Thu May 27 16:29:36 2021 -0700

    Refactor book keeper cluster test case
    
    ### Motivation
    BookKeeperClusterTestCase has historically exposed its members to all
    subclasses, which would then manipulate them in many ways. There was
    an array of objects for configurations, bookieServers, autorecovery,
    which implicit linking between the objects based on maps and indices.
    Individual subclasses manipulated these arrays.
    
    This makes it hard to add any dependency injection on the objects
    managed by BookKeeperClusterTestCase as the objects. To add DI, we
    need each object to have a bunch of other objects associated with
    it. For example, for each Bookie, we need to create the
    Journal. Maintaining these in separate arrays will lead to fragile
    tests.
    
    This change encapsulates all the testing objects in a per bookie
    object, and only allows manipulation through methods. This will allow
    us to group the objects needed for DI clearly.
    
    Disable testFollowBookieAddressChangeTrckingDisabled
    
    Reviewers: Henry Saputra <hs...@apache.org>, Matteo Merli <mm...@apache.org>
    
    This closes #2723 from pkumar-singh/refactor_BookKeeperClusterTestCase and squashes the following commits:
    
    47fb8121f [Prashant] Addressed code review comments
    7f410ce96 [Ivan Kelly] Encapulate members of BookKeeperClusterTestCase
    c1847af5a [Prashant Kumar] Turn Bookie into an interface
    9417b68ca [Andrey Yegorov] site update for release 4.14, this time actually updating the latest version in _config.yml (#2722)
---
 .../apache/bookkeeper/benchmark/TestBenchmark.java |   3 +-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |   4 +-
 .../cli/commands/bookies/EndpointInfoCommand.java  |   8 +-
 .../java/org/apache/bookkeeper/auth/TestAuth.java  |  19 +-
 .../bookkeeper/bookie/AdvertisedAddressTest.java   |   6 +-
 .../bookie/BookieInitializationTest.java           |   3 +-
 .../bookkeeper/bookie/BookieJournalBypassTest.java |  11 +-
 .../bookkeeper/bookie/BookieShutdownTest.java      |   6 +-
 .../bookkeeper/bookie/BookieStickyReadsTest.java   |   2 +-
 .../bookie/BookieStorageThresholdTest.java         |   5 +-
 .../bookie/BookieWriteToJournalTest.java           |   2 +-
 .../apache/bookkeeper/bookie/CompactionTest.java   | 277 +++++------
 .../org/apache/bookkeeper/bookie/CookieTest.java   |  31 +-
 .../bookie/ForceAuditorChecksCmdTest.java          |   2 +-
 .../bookie/GcOverreplicatedLedgerTest.java         |   9 +-
 .../bookkeeper/bookie/IndexCorruptionTest.java     |   4 +-
 .../bookkeeper/bookie/LedgerStorageTest.java       |  31 +-
 .../bookkeeper/bookie/UpdateCookieCmdTest.java     |  20 +-
 .../org/apache/bookkeeper/bookie/UpgradeTest.java  |  34 +-
 .../bookkeeper/client/BookKeeperAdminTest.java     |  53 +--
 .../bookkeeper/client/BookKeeperCloseTest.java     |   3 +-
 ...KeeperDiskSpaceWeightedLedgerPlacementTest.java | 161 +++----
 .../apache/bookkeeper/client/BookKeeperTest.java   |  10 +-
 .../bookkeeper/client/BookieDecommissionTest.java  |   2 +-
 .../client/BookieNetworkAddressChangeTest.java     |  18 +-
 .../bookkeeper/client/BookieRecoveryTest.java      |  56 ++-
 .../bookkeeper/client/BookieWriteLedgerTest.java   |   3 +-
 .../apache/bookkeeper/client/LedgerCloseTest.java  |   6 +-
 .../apache/bookkeeper/client/LedgerCmdTest.java    |  14 +-
 .../bookkeeper/client/LedgerRecoveryTest.java      |  24 +-
 .../apache/bookkeeper/client/MdcContextTest.java   |   4 +-
 .../client/ParallelLedgerRecoveryTest.java         |   3 +-
 .../bookkeeper/client/TestBookieWatcher.java       |   4 +-
 .../bookkeeper/client/TestDelayEnsembleChange.java |  41 +-
 .../client/TestDisableEnsembleChange.java          |   3 +-
 .../client/TestReadLastConfirmedAndEntry.java      |   9 +-
 .../client/TestReadLastConfirmedLongPoll.java      |   3 +-
 .../client/TestTryReadLastConfirmed.java           |   3 +-
 .../bookkeeper/client/UpdateLedgerCmdTest.java     |   8 +-
 .../bookkeeper/client/UpdateLedgerOpTest.java      |   5 +-
 .../bookkeeper/proto/BookieBackpressureTest.java   | 134 +++---
 .../bookkeeper/proto/NetworkLessBookieTest.java    |   4 +-
 .../bookkeeper/proto/TestBackwardCompatCMS42.java  |   5 +-
 .../proto/TestPerChannelBookieClient.java          |   3 +-
 .../bookkeeper/replication/AuditorBookieTest.java  |  41 +-
 .../replication/AuditorLedgerCheckerTest.java      |  62 +--
 .../AuditorPeriodicBookieCheckTest.java            |   6 +-
 .../replication/AuditorPeriodicCheckTest.java      |  40 +-
 .../AuditorPlacementPolicyCheckTest.java           |  14 +-
 .../replication/AuditorReplicasCheckTest.java      |   9 +-
 .../replication/AuditorRollingRestartTest.java     |   2 +-
 .../replication/AuthAutoRecoveryTest.java          |   2 +-
 .../replication/AutoRecoveryMainTest.java          |  20 +-
 .../replication/BookieAutoRecoveryTest.java        |  44 +-
 .../replication/BookieLedgerIndexTest.java         |  11 +-
 .../replication/TestReplicationWorker.java         |   6 +-
 .../bookkeeper/sasl/GSSAPIBookKeeperTest.java      |  17 +-
 .../bookkeeper/sasl/MD5DigestBookKeeperTest.java   |  17 +-
 .../bookkeeper/server/http/TestHttpService.java    |   9 +-
 .../server/http/service/ListLedgerServiceTest.java |   2 +-
 .../bookkeeper/test/BookKeeperClusterTestCase.java | 511 +++++++++++----------
 .../apache/bookkeeper/test/BookieClientTest.java   |   1 +
 .../apache/bookkeeper/test/BookieFailureTest.java  |  31 +-
 .../bookkeeper/test/BookieJournalRollingTest.java  |  30 +-
 .../bookkeeper/test/ForceReadOnlyBookieTest.java   |   8 +-
 .../apache/bookkeeper/test/LedgerDeleteTest.java   |   4 +-
 .../bookkeeper/test/LocalBookiesRegistryTest.java  |   8 +-
 .../apache/bookkeeper/test/ReadOnlyBookieTest.java |  39 +-
 .../java/org/apache/bookkeeper/tls/TestTLS.java    | 147 +++---
 .../java/org/apache/bookkeeper/util/TestUtils.java |   5 +
 70 files changed, 1024 insertions(+), 1118 deletions(-)

diff --git a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
index bb1e37c..9813a97 100644
--- a/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
+++ b/bookkeeper-benchmark/src/test/java/org/apache/bookkeeper/benchmark/TestBenchmark.java
@@ -68,7 +68,8 @@ public class TestBenchmark extends BookKeeperClusterTestCase {
 
     @Test
     public void testBookie() throws Exception {
-        BookieSocketAddress bookie = getBookieAddress(0);
+        BookieSocketAddress bookie = serverByIndex(0).getLocalAddress();
+
         BenchBookie.main(new String[] {
                 "--host", bookie.getSocketAddress().getHostName(),
                 "--port", String.valueOf(bookie.getPort()),
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 4f73374..378df4b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -225,10 +225,10 @@ public class BookKeeperAdmin implements AutoCloseable {
     }
 
     @SneakyThrows
-    public BookieServiceInfo getBookieServiceInfo(String bookiedId)
+    public BookieServiceInfo getBookieServiceInfo(BookieId bookiedId)
             throws BKException {
         return FutureUtils.result(bkc.getMetadataClientDriver()
-                .getRegistrationClient().getBookieServiceInfo(BookieId.parse(bookiedId))).getValue();
+                .getRegistrationClient().getBookieServiceInfo(bookiedId)).getValue();
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/EndpointInfoCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/EndpointInfoCommand.java
index 9ebbad4..fc5f199 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/EndpointInfoCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookies/EndpointInfoCommand.java
@@ -80,13 +80,13 @@ public class EndpointInfoCommand extends BookieCommand<EndpointInfoCommand.Endpo
         ClientConfiguration adminConf = new ClientConfiguration(conf);
         BookKeeperAdmin admin = new BookKeeperAdmin(adminConf);
         try {
-            final String bookieId = flags.bookie;
-            if (bookieId == null || bookieId.isEmpty()) {
+            final String bookieIdStr = flags.bookie;
+            if (bookieIdStr == null || bookieIdStr.isEmpty()) {
                 throw new IllegalArgumentException("BookieId is required");
             }
-            BookieId address = BookieId.parse(bookieId);
+            BookieId bookieId = BookieId.parse(bookieIdStr);
             Collection<BookieId> allBookies = admin.getAllBookies();
-            if (!allBookies.contains(address)) {
+            if (!allBookies.contains(bookieId)) {
                 LOG.info("Bookie " + bookieId + " does not exist, only " + allBookies);
                 return false;
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
index c435d99..713dcbf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/auth/TestAuth.java
@@ -110,14 +110,14 @@ public class TestAuth extends BookKeeperClusterTestCase {
     private int entryCount(long ledgerId, ServerConfiguration bookieConf,
                            ClientConfiguration clientConf) throws Exception {
         LOG.info("Counting entries in {}", ledgerId);
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setBookieAuthProviderFactoryClass(
-                    AlwaysSucceedBookieAuthProviderFactory.class.getName());
-        }
         clientConf.setClientAuthProviderFactoryClass(
                 SendUntilCompleteClientAuthProviderFactory.class.getName());
 
-        restartBookies();
+        restartBookies(c -> {
+                c.setBookieAuthProviderFactoryClass(
+                        AlwaysSucceedBookieAuthProviderFactory.class.getName());
+                return c;
+            });
 
         int count = 0;
         try (BookKeeper bkc = new BookKeeper(clientConf, zkc);
@@ -184,9 +184,7 @@ public class TestAuth extends BookKeeperClusterTestCase {
         assertFalse(ledgerId.get() == -1);
         assertEquals("Should have entry", 1, entryCount(ledgerId.get(), bookieConf, clientConf));
 
-        for (BookieServer bks : bs) {
-            bks.shutdown();
-        }
+        stopAllBookies();
 
         assertEquals(LogCloseCallsBookieAuthProviderFactory.initCountersOnConnections.get(),
             LogCloseCallsBookieAuthProviderFactory.closeCountersOnConnections.get());
@@ -481,10 +479,7 @@ public class TestAuth extends BookKeeperClusterTestCase {
     }
 
     BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
-        bsConfs.add(conf);
-        BookieServer s = startBookie(conf);
-        bs.add(s);
-        return s;
+        return startAndAddBookie(conf).getServer();
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/AdvertisedAddressTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/AdvertisedAddressTest.java
index 70619fc..7e269a5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/AdvertisedAddressTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/AdvertisedAddressTest.java
@@ -22,11 +22,9 @@
 package org.apache.bookkeeper.bookie;
 
 import static org.junit.Assert.assertEquals;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
-
 import java.util.UUID;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -34,7 +32,6 @@ import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.PortManager;
 import org.junit.Test;
 
@@ -49,11 +46,10 @@ public class AdvertisedAddressTest extends BookKeeperClusterTestCase {
     }
 
     private String newDirectory(boolean createCurDir) throws IOException {
-        File d = IOUtils.createTempDir("cookie", "tmpdir");
+        File d = createTempDir("cookie", "tmpdir");
         if (createCurDir) {
             new File(d, "current").mkdirs();
         }
-        tmpDirs.add(d);
         return d.getPath();
     }
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 7f4257b..11af989 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -317,7 +317,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         bkServer.start();
         bkServer.join();
         assertEquals("Failed to return ExitCode.ZK_REG_FAIL",
-                ExitCode.ZK_REG_FAIL, bkServer.getExitCode());
+                     ExitCode.ZK_REG_FAIL, bkServer.getExitCode());
     }
 
     @Test
@@ -433,7 +433,6 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
 
         final BookieId bookieId = BookieId.parse(InetAddress.getLocalHost().getCanonicalHostName().split("\\.", 2)[0]
             + ":" + conf.getBookiePort());
-
         driver.initialize(conf, () -> {}, NullStatsLogger.INSTANCE);
         try (StateManager manager = new BookieStateManager(conf, driver)) {
             manager.registerBookie(true).get();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalBypassTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalBypassTest.java
index fe20322..fa5d563 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalBypassTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalBypassTest.java
@@ -27,7 +27,6 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.junit.Test;
 
@@ -44,7 +43,7 @@ public class BookieJournalBypassTest extends BookKeeperClusterTestCase {
     }
 
     @Override
-    protected BookieServer startBookie(ServerConfiguration conf) throws Exception {
+    protected ServerTester startBookie(ServerConfiguration conf) throws Exception {
         if (bookieIdx++ == 0) {
             // First bookie will have the journal disabled
             conf.setJournalWriteData(false);
@@ -56,13 +55,13 @@ public class BookieJournalBypassTest extends BookKeeperClusterTestCase {
     public void testJournalBypass() throws Exception {
         ClientConfiguration conf = new ClientConfiguration(baseClientConf);
 
-        BookieImpl bookieImpl = (BookieImpl) bs.get(0).getBookie();
+        BookieImpl bookieImpl = (BookieImpl) serverByIndex(0).getBookie();
         Journal journal0 = bookieImpl.journals.get(0);
-        LedgerStorage ls0 = bs.get(0).getBookie().getLedgerStorage();
+        LedgerStorage ls0 = serverByIndex(0).getBookie().getLedgerStorage();
 
-        bookieImpl = (BookieImpl) bs.get(1).getBookie();
+        bookieImpl = (BookieImpl) serverByIndex(1).getBookie();
         Journal journal1 = bookieImpl.journals.get(0);
-        LedgerStorage ls1 = bs.get(1).getBookie().getLedgerStorage();
+        LedgerStorage ls1 = serverByIndex(1).getBookie().getLedgerStorage();
 
         ls0.flush();
         ls1.flush();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShutdownTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShutdownTest.java
index e5f3a59..92d57ae 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShutdownTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieShutdownTest.java
@@ -84,9 +84,7 @@ public class BookieShutdownTest extends BookKeeperClusterTestCase {
                         + " and now going to fail bookie.");
                 // Shutdown one Bookie server and restarting new one to continue
                 // writing
-                bsConfs.remove(0);
-                bs.get(0).shutdown();
-                bs.remove(0);
+                killBookie(0);
                 startNewBookie();
                 LOG.info("Shutdown one bookie server and started new bookie server...");
             } catch (BKException e) {
@@ -118,7 +116,7 @@ public class BookieShutdownTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieShutdownFromBookieThread() throws Exception {
-        ServerConfiguration conf = bsConfs.get(0);
+        ServerConfiguration conf = confByIndex(0);
         killBookie(0);
         final CountDownLatch latch = new CountDownLatch(1);
         final CountDownLatch shutdownComplete = new CountDownLatch(1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
index a23b0e5..1046dea 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStickyReadsTest.java
@@ -153,7 +153,7 @@ public class BookieStickyReadsTest extends BookKeeperClusterTestCase {
 
         // Suspend the sticky bookie. Reads should now go to a different sticky
         // bookie
-        bs.get(bookieWithRequests).suspendProcessing();
+        serverByIndex(bookieWithRequests).suspendProcessing();
 
         for (int i = 0; i < n; i++) {
             @Cleanup
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
index 6c4cdb3..ad19d99 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieStorageThresholdTest.java
@@ -154,9 +154,8 @@ public class BookieStorageThresholdTest extends BookKeeperClusterTestCase {
         };
         conf.setLedgerDirNames(ledgerDirNames);
         conf.setJournalDirName(journalDir.getPath());
-        BookieServer server = startBookie(conf);
-        bs.add(server);
-        bsConfs.add(conf);
+
+        BookieServer server = startAndAddBookie(conf).getServer();
         BookieImpl bookie = (BookieImpl) server.getBookie();
         // since we are going to set dependency injected dirsMonitor, so we need to shutdown
         // the dirsMonitor which was created as part of the initialization of Bookie
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
index 597ca77..81cd636 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieWriteToJournalTest.java
@@ -29,7 +29,6 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.whenNew;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.File;
@@ -80,6 +79,7 @@ public class BookieWriteToJournalTest {
     /**
      * test that Bookie calls correctly Journal.logAddEntry about "ackBeforeSync" parameter.
      */
+
     @Test
     public void testJournalLogAddEntryCalledCorrectly() throws Exception {
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 5152040..408446d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -142,9 +142,9 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         super.setUp();
     }
 
-    private GarbageCollectorThread getGCThread() {
-        assertEquals(1, bs.size());
-        BookieServer server = bs.get(0);
+    private GarbageCollectorThread getGCThread() throws Exception {
+        assertEquals(1, bookieCount());
+        BookieServer server = serverByIndex(0);
         return ((InterleavedLedgerStorage) server.getBookie().getLedgerStorage()).gcThread;
     }
 
@@ -198,11 +198,12 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         LedgerHandle[] lhs = prepareData(3, false);
 
         // disable compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setMajorCompactionThreshold(0.0f);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                c.setMinorCompactionThreshold(0.0f);
+                c.setMajorCompactionThreshold(0.0f);
+                return c;
+            });
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
@@ -222,7 +223,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
 
         // entry logs ([0,1].log) should not be compacted.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue("Not Found entry log file ([0,1].log that should have been compacted in ledgerDirectory: "
                             + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1));
         }
@@ -233,12 +234,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
 
-        baseConf.setForceAllowCompaction(true);
-        baseConf.setMajorCompactionThreshold(0.5f);
-        baseConf.setMinorCompactionThreshold(0.2f);
-        baseConf.setMajorCompactionInterval(0);
-        baseConf.setMinorCompactionInterval(0);
-        restartBookies(baseConf);
+        restartBookies(c -> {
+            c.setForceAllowCompaction(true);
+            c.setMajorCompactionThreshold(0.5f);
+            c.setMinorCompactionThreshold(0.2f);
+            c.setMajorCompactionInterval(0);
+            c.setMinorCompactionInterval(0);
+            return c;
+        });
 
         assertFalse(getGCThread().enableMajorCompaction);
         assertFalse(getGCThread().enableMinorCompaction);
@@ -329,14 +332,16 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable major compaction
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable major compaction
+                c.setMajorCompactionThreshold(0.0f);
+                c.setGcWaitTime(60000);
+                c.setMinorCompactionInterval(120000);
+                c.setMajorCompactionInterval(240000);
+                return c;
+            });
+
 
         getGCThread().enableForceGC();
         getGCThread().triggerGC().get();
@@ -369,7 +374,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
 
         // entry logs ([0,1,2].log) should be compacted.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
                     + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -400,17 +405,18 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         }
 
         // disable major compaction
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
-        // Setup limit on compaction duration.
-        baseConf.setMinorCompactionMaxTimeMillis(15);
-        baseConf.setMajorCompactionMaxTimeMillis(15);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c-> {
+            c.setMajorCompactionThreshold(0.0f);
+            c.setGcWaitTime(60000);
+            c.setMinorCompactionInterval(120000);
+            c.setMajorCompactionInterval(240000);
+
+            // Setup limit on compaction duration.
+            c.setMinorCompactionMaxTimeMillis(15);
+            c.setMajorCompactionMaxTimeMillis(15);
+            return c;
+        });
 
         getGCThread().enableForceGC();
         getGCThread().triggerGC().get();
@@ -476,15 +482,15 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable major compaction
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(-1);
-        baseConf.setMajorCompactionInterval(-1);
-        baseConf.setForceAllowCompaction(true);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c-> {
+            c.setMajorCompactionThreshold(0.0f);
+            c.setGcWaitTime(60000);
+            c.setMinorCompactionInterval(-1);
+            c.setMajorCompactionInterval(-1);
+            c.setForceAllowCompaction(true);
+            return c;
+        });
 
         getGCThread().enableForceGC();
         getGCThread().triggerGC().get();
@@ -547,22 +553,23 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable major compaction
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable major compaction
+                c.setMajorCompactionThreshold(0.0f);
+                c.setGcWaitTime(60000);
+                c.setMinorCompactionInterval(120000);
+                c.setMajorCompactionInterval(240000);
+                return c;
+            });
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
         assertFalse(getGCThread().enableMajorCompaction);
         assertTrue(getGCThread().enableMinorCompaction);
 
-        for (BookieServer bookieServer : bs) {
-            BookieImpl bookie = ((BookieImpl) bookieServer.getBookie());
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieImpl bookie = ((BookieImpl) serverByIndex(i).getBookie());
             LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
             List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
             // if all the discs are full then Major and Minor compaction would be disabled since
@@ -584,7 +591,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertEquals(lastMinorCompactionTime, getGCThread().lastMinorCompactionTime);
 
         // entry logs ([0,1,2].log) should still remain, because both major and Minor compaction are disabled.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue(
                     "All the entry log files ([0,1,2].log are not available, which is not expected" + ledgerDirectory,
                     TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1, 2));
@@ -615,9 +622,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         };
         conf.setLedgerDirNames(ledgerDirNames);
         conf.setJournalDirName(journalDir.getPath());
-        BookieServer server = startBookie(conf);
-        bs.add(server);
-        bsConfs.add(conf);
+        BookieServer server = startAndAddBookie(conf).getServer();
         // prepare data
         LedgerHandle[] lhs = prepareData(3, false);
 
@@ -630,8 +635,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertFalse(getGCThread().enableMajorCompaction);
         assertTrue(getGCThread().enableMinorCompaction);
 
-        for (BookieServer bookieServer : bs) {
-            BookieImpl bookie = ((BookieImpl) bookieServer.getBookie());
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieImpl bookie = ((BookieImpl) serverByIndex(i).getBookie());
             bookie.getLedgerStorage().flush();
             bookie.dirsMonitor.shutdown();
             LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
@@ -668,8 +673,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         verifyLedger(lhs[0].getId(), 0, lhs[0].getLastAddConfirmed());
 
         // for the sake of validity of test lets make sure that there is no writableLedgerDir in the bookies
-        for (BookieServer bookieServer : bs) {
-            BookieImpl bookie = (BookieImpl) bookieServer.getBookie();
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieImpl bookie = (BookieImpl) serverByIndex(i).getBookie();
             LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
             try {
                 List<File> ledgerDirs = ledgerDirsManager.getWritableLedgerDirs();
@@ -691,14 +696,15 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable minor compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable minor compaction
+                c.setMinorCompactionThreshold(0.0f);
+                c.setGcWaitTime(60000);
+                c.setMinorCompactionInterval(120000);
+                c.setMajorCompactionInterval(240000);
+                return c;
+            });
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
@@ -717,7 +723,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
 
         // entry logs ([0,1,2].log) should be compacted
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
                       + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -737,15 +743,16 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable minor compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(-1);
-        baseConf.setMajorCompactionInterval(-1);
-        baseConf.setForceAllowCompaction(true);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c-> {
+            // disable minor compaction
+            c.setMinorCompactionThreshold(0.0f);
+            c.setGcWaitTime(60000);
+            c.setMinorCompactionInterval(-1);
+            c.setMajorCompactionInterval(-1);
+            c.setForceAllowCompaction(true);
+            return c;
+        });
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
@@ -795,14 +802,15 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable minor compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable minor compaction
+                c.setMinorCompactionThreshold(0.0f);
+                c.setGcWaitTime(60000);
+                c.setMinorCompactionInterval(120000);
+                c.setMajorCompactionInterval(240000);
+                return c;
+            });
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
@@ -821,7 +829,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
 
         // entry logs ([0,1,2].log) should be compacted
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
                     + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -837,7 +845,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
          * there is only one bookie in the cluster so we should be able to read
          * entries from this bookie.
          */
-        ServerConfiguration bookieServerConfig = ((BookieImpl) bs.get(0).getBookie()).conf;
+        ServerConfiguration bookieServerConfig = ((BookieImpl) serverByIndex(0).getBookie()).conf;
         ServerConfiguration newBookieConf = new ServerConfiguration(bookieServerConfig);
         /*
          * by reusing bookieServerConfig and setting metadataServiceUri to null
@@ -867,8 +875,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
          * for this test scenario we are assuming that there will be only one
          * bookie in the cluster
          */
-        assertEquals("Numbers of Bookies in this cluster", 1, bsConfs.size());
-        ServerConfiguration serverConfig = bsConfs.get(0);
+        assertEquals("Numbers of Bookies in this cluster", 1, bookieCount());
+        ServerConfiguration serverConfig = confByIndex(0);
         File ledgerDir = serverConfig.getLedgerDirs()[0];
         assertEquals("Number of Ledgerdirs for this bookie", 1, serverConfig.getLedgerDirs().length);
         assertTrue("indexdirs should be configured to null", null == serverConfig.getIndexDirs());
@@ -884,7 +892,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        bs.get(0).getBookie().getLedgerStorage().flush();
+        serverByIndex(0).getBookie().getLedgerStorage().flush();
         assertTrue(
                 "entry log file ([0,1,2].log should be available in ledgerDirectory: "
                         + serverConfig.getLedgerDirs()[0],
@@ -893,18 +901,6 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         long usableSpace = ledgerDir.getUsableSpace();
         long totalSpace = ledgerDir.getTotalSpace();
 
-        baseConf.setForceReadOnlyBookie(true);
-        baseConf.setIsForceGCAllowWhenNoSpace(true);
-        // disable minor compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(60000);
-        baseConf.setMinorCompactionInterval(120000);
-        baseConf.setMajorCompactionInterval(240000);
-        baseConf.setMinUsableSizeForEntryLogCreation(1);
-        baseConf.setMinUsableSizeForIndexFileCreation(1);
-        baseConf.setDiskUsageThreshold((1.0f - ((float) usableSpace / (float) totalSpace)) * 0.9f);
-        baseConf.setDiskUsageWarnThreshold(0.0f);
-
         /*
          * because of the value set for diskUsageThreshold, when bookie is
          * restarted it wouldn't find any writableledgerdir. But we have set
@@ -914,10 +910,23 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
          */
 
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                c.setForceReadOnlyBookie(true);
+                c.setIsForceGCAllowWhenNoSpace(true);
+                // disable minor compaction
+                c.setMinorCompactionThreshold(0.0f);
+                c.setGcWaitTime(60000);
+                c.setMinorCompactionInterval(120000);
+                c.setMajorCompactionInterval(240000);
+                c.setMinUsableSizeForEntryLogCreation(1);
+                c.setMinUsableSizeForIndexFileCreation(1);
+                c.setDiskUsageThreshold((1.0f - ((float) usableSpace / (float) totalSpace)) * 0.9f);
+                c.setDiskUsageWarnThreshold(0.0f);
+                return c;
+            });
 
         assertFalse("There shouldn't be any writable ledgerDir",
-                    ((BookieImpl) bs.get(0).getBookie()).getLedgerDirsManager().hasWritableLedgerDirs());
+                    ((BookieImpl) serverByIndex(0).getBookie()).getLedgerDirsManager().hasWritableLedgerDirs());
 
         long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
         long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
@@ -941,7 +950,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
          */
 
         // entry logs ([0,1,2].log) should be compacted
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
                     + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -980,7 +989,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
 
         // entry logs ([0,1,2].log) should not be compacted
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue("Not Found entry log file ([1,2].log that should have been compacted in ledgerDirectory: "
                      + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1, 2));
         }
@@ -1018,7 +1027,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
 
         // entry logs (0.log) should not be compacted
         // entry logs ([1,2,3].log) should be compacted.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue("Not Found entry log file ([0].log that should have been compacted in ledgerDirectory: "
                      + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0));
             assertFalse("Found entry log file ([1,2,3].log that should have not been compacted in ledgerDirectory: "
@@ -1272,10 +1281,12 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
      * Test extractMetaFromEntryLogs optimized method to avoid excess memory usage.
      */
     public void testExtractMetaFromEntryLogs() throws Exception {
-        // Always run this test with Throttle enabled.
-        baseConf.setIsThrottleByBytes(true);
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // Always run this test with Throttle enabled.
+                c.setIsThrottleByBytes(true);
+                return c;
+            });
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         File tmpDir = createTempDir("bkTest", ".dir");
         File curDir = BookieImpl.getCurrentDirectory(tmpDir);
@@ -1470,15 +1481,18 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setGcWaitTime(600000);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable compaction
+                c.setMinorCompactionThreshold(0.0f);
+                c.setMajorCompactionThreshold(0.0f);
+                c.setGcWaitTime(600000);
+                return c;
+            });
+
 
-        BookieImpl bookie = ((BookieImpl) bs.get(0).getBookie());
+
+        BookieImpl bookie = ((BookieImpl) serverByIndex(0).getBookie());
         InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage;
 
         // remove ledger2 and ledger3
@@ -1496,7 +1510,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         }
 
         // entry logs ([0,1,2].log) should not be compacted because of partial flush throw IOException
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -1514,7 +1528,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         assertEquals(findCompactedEntryLogFiles().size(), 0);
 
         // compaction worker should recover partial flushed index and delete [0,1,2].log
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2));
         }
@@ -1533,13 +1547,14 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             lh.close();
         }
 
-        // disable compaction
-        baseConf.setMinorCompactionThreshold(0.0f);
-        baseConf.setMajorCompactionThreshold(0.0f);
-        baseConf.setUseTransactionalCompaction(true);
-
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // disable compaction
+                c.setMinorCompactionThreshold(0.0f);
+                c.setMajorCompactionThreshold(0.0f);
+                c.setUseTransactionalCompaction(true);
+                return c;
+            });
 
         // remove ledger2 and ledger3
         bkc.deleteLedger(lhs[1].getId());
@@ -1548,7 +1563,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         LOG.info("Finished deleting the ledgers contains most entries.");
         Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
             + baseConf.getGcWaitTime());
-        BookieImpl bookie = (BookieImpl) bs.get(0).getBookie();
+        BookieImpl bookie = (BookieImpl) serverByIndex(0).getBookie();
         InterleavedLedgerStorage storage = (InterleavedLedgerStorage) bookie.ledgerStorage;
 
         List<File> ledgerDirs = bookie.getLedgerDirsManager().getAllLedgerDirs();
@@ -1564,7 +1579,7 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         }
 
         // entry logs ([0-4].log) should not be compacted because of failure in flush compaction log
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertTrue("Entry log file ([0,1,2].log should not be compacted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4));
         }
@@ -1580,16 +1595,18 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             assertEquals(usageBeforeCompaction.get(i), freeSpaceAfterCompactionFailed.get(i));
         }
 
-        // now enable normal compaction
-        baseConf.setMajorCompactionThreshold(0.5f);
 
         // restart bookies
-        restartBookies(baseConf);
+        restartBookies(c -> {
+                // now enable normal compaction
+                c.setMajorCompactionThreshold(0.5f);
+                return c;
+            });
 
-        Thread.sleep(baseConf.getMajorCompactionInterval() * 1000
-            + baseConf.getGcWaitTime());
+        Thread.sleep(confByIndex(0).getMajorCompactionInterval() * 1000
+                + confByIndex(0).getGcWaitTime());
         // compaction worker should compact [0-4].log
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Entry log file ([0,1,2].log should have been compacted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1, 2, 3, 4));
         }
@@ -1607,9 +1624,9 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         return size;
     }
 
-    private Set<File> findCompactedEntryLogFiles() {
+    private Set<File> findCompactedEntryLogFiles() throws Exception {
         Set<File> compactedLogFiles = new HashSet<>();
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             File[] files = BookieImpl.getCurrentDirectory(ledgerDirectory).listFiles(
                 file -> file.getName().endsWith(COMPACTED_SUFFIX));
             if (files != null) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
index e551495..059587b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CookieTest.java
@@ -21,14 +21,13 @@
 
 package org.apache.bookkeeper.bookie;
 
-import static org.apache.bookkeeper.bookie.UpgradeTest.newV1JournalDirectory;
-import static org.apache.bookkeeper.bookie.UpgradeTest.newV1LedgerDirectory;
-import static org.apache.bookkeeper.bookie.UpgradeTest.newV2JournalDirectory;
-import static org.apache.bookkeeper.bookie.UpgradeTest.newV2LedgerDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.initV1JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.initV1LedgerDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.initV2JournalDirectory;
+import static org.apache.bookkeeper.bookie.UpgradeTest.initV2LedgerDirectory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.IOException;
@@ -46,7 +45,6 @@ import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.PortManager;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Version;
@@ -69,11 +67,10 @@ public class CookieTest extends BookKeeperClusterTestCase {
     }
 
     private String newDirectory(boolean createCurDir) throws IOException {
-        File d = IOUtils.createTempDir("cookie", "tmpdir");
+        File d = createTempDir("cookie", "tmpdir");
         if (createCurDir) {
             new File(d, "current").mkdirs();
         }
-        tmpDirs.add(d);
         return d.getPath();
     }
 
@@ -547,10 +544,8 @@ public class CookieTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testV2data() throws Exception {
-        File journalDir = newV2JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV2LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV2JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV2LedgerDirectory(createTempDir("bookie", "ledger"));
 
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(journalDir.getPath())
@@ -572,10 +567,8 @@ public class CookieTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testV1data() throws Exception {
-        File journalDir = newV1JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV1LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV1JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV1LedgerDirectory(createTempDir("bookie", "ledger"));
 
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(journalDir.getPath())
@@ -678,10 +671,8 @@ public class CookieTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testV2dataWithHostNameAsBookieID() throws Exception {
-        File journalDir = newV2JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV2LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV2JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV2LedgerDirectory(createTempDir("bookie", "ledger"));
 
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(journalDir.getPath())
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
index 4c0479a..ad96129 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/ForceAuditorChecksCmdTest.java
@@ -51,7 +51,7 @@ public class ForceAuditorChecksCmdTest extends BookKeeperClusterTestCase {
         String[] argv = new String[] { "forceauditchecks", "-calc", "-ppc", "-rc" };
         long curTime = System.currentTimeMillis();
 
-        final ServerConfiguration conf = bsConfs.get(0);
+        final ServerConfiguration conf = confByIndex(0);
         BookieShell bkShell = new BookieShell();
         bkShell.setConf(conf);
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
index d5b454b..35dee87 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GcOverreplicatedLedgerTest.java
@@ -22,9 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import com.google.common.collect.Lists;
-
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -41,7 +39,6 @@ import org.apache.bookkeeper.meta.LedgerManagerTestCase;
 import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.zookeeper.ZooDefs;
@@ -182,11 +179,9 @@ public class GcOverreplicatedLedgerTest extends LedgerManagerTestCase {
         Assert.assertTrue(activeLedgers.containsKey(lh.getId()));
     }
 
-    private BookieId getBookieNotInEnsemble(LedgerMetadata ledgerMetadata) throws UnknownHostException {
+    private BookieId getBookieNotInEnsemble(LedgerMetadata ledgerMetadata) throws Exception {
         List<BookieId> allAddresses = Lists.newArrayList();
-        for (BookieServer bk : bs) {
-            allAddresses.add(bk.getBookieId());
-        }
+        allAddresses.addAll(bookieAddresses());
         SortedMap<Long, ? extends List<BookieId>> ensembles = ledgerMetadata.getAllEnsembles();
         for (List<BookieId> fragmentEnsembles : ensembles.values()) {
             allAddresses.removeAll(fragmentEnsembles);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
index 8eeec6e..e1425de 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexCorruptionTest.java
@@ -55,7 +55,7 @@ public class IndexCorruptionTest extends BookKeeperClusterTestCase {
     public void testNoSuchLedger() throws Exception {
         LOG.debug("Testing NoSuchLedger");
 
-        SyncThread syncThread = ((BookieImpl) bs.get(0).getBookie()).syncThread;
+        SyncThread syncThread = ((BookieImpl) serverByIndex(0).getBookie()).syncThread;
         syncThread.suspendSync();
         // Create a ledger
         LedgerHandle lh = bkc.createLedger(1, 1, digestType, "".getBytes());
@@ -96,7 +96,7 @@ public class IndexCorruptionTest extends BookKeeperClusterTestCase {
     public void testEmptyIndexPage() throws Exception {
         LOG.debug("Testing EmptyIndexPage");
 
-        SyncThread syncThread = ((BookieImpl) bs.get(0).getBookie()).syncThread;
+        SyncThread syncThread = ((BookieImpl) serverByIndex(0).getBookie()).syncThread;
         assertNotNull("Not found SyncThread.", syncThread);
 
         syncThread.suspendSync();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
index f5f2c2d..ce0be74 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerStorageTest.java
@@ -54,7 +54,7 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
 
     @Test
     public void testLedgerDeleteNotification() throws Exception {
-        LedgerStorage ledgerStorage = bs.get(0).getBookie().getLedgerStorage();
+        LedgerStorage ledgerStorage = serverByIndex(0).getBookie().getLedgerStorage();
 
         long deletedLedgerId = 5;
         ledgerStorage.setMasterKey(deletedLedgerId, new byte[0]);
@@ -92,11 +92,12 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
 
     public void testExplicitLacWriteToJournal(int journalFormatVersionToWrite, int fileInfoFormatVersionToWrite)
             throws Exception {
-        ServerConfiguration bookieServerConfig = bsConfs.get(0);
-        bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
-        bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
+        restartBookies(c -> {
+                c.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
+                c.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
+                return c;
+            });
 
-        restartBookies(bookieServerConfig);
 
         ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
         confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -133,7 +134,7 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
         assertEquals("Read explicit LAC of rlh after wait for explicitlacflush", (numOfEntries - 1),
                 readExplicitLastConfirmed);
 
-        ServerConfiguration newBookieConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration newBookieConf = new ServerConfiguration(confByIndex(0));
         /*
          * by reusing bookieServerConfig and setting metadataServiceUri to null
          * we can create/start new Bookie instance using the same data
@@ -182,11 +183,11 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
 
     public void testExplicitLacWriteToFileInfo(int journalFormatVersionToWrite, int fileInfoFormatVersionToWrite)
             throws Exception {
-        ServerConfiguration bookieServerConfig = bsConfs.get(0);
-        bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
-        bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
-
-        restartBookies(bookieServerConfig);
+        restartBookies(c -> {
+                c.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
+                c.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);
+                return c;
+            });
 
         ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
         confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -226,10 +227,10 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
         /*
          * flush ledgerStorage so that header of fileinfo is flushed.
          */
-        bs.get(0).getBookie().getLedgerStorage().flush();
+        serverByIndex(0).getBookie().getLedgerStorage().flush();
 
         ReadOnlyFileInfo fileInfo = getFileInfo(ledgerId,
-                                                BookieImpl.getCurrentDirectories(bsConfs.get(0).getLedgerDirs()));
+                                                BookieImpl.getCurrentDirectories(confByIndex(0).getLedgerDirs()));
         fileInfo.readHeader();
         ByteBuf explicitLacBufReadFromFileInfo = fileInfo.getExplicitLac();
 
@@ -288,7 +289,7 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
     public void testGetListOfEntriesOfLedger() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
-        int numOfBookies = bs.size();
+        int numOfBookies = bookieCount();
         int numOfEntries = 5;
         BookKeeper.DigestType digestType = BookKeeper.DigestType.CRC32;
         BookKeeper bkc = new BookKeeper(conf);
@@ -298,7 +299,7 @@ public class LedgerStorageTest extends BookKeeperClusterTestCase {
             lh.addEntry("000".getBytes());
         }
 
-        ServerConfiguration newBookieConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration newBookieConf = new ServerConfiguration(confByIndex(0));
         /*
          * by reusing bookieServerConfig and setting metadataServiceUri to null
          * we can create/start new Bookie instance using the same data
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
index 35913ba..a5e2e13 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpdateCookieCmdTest.java
@@ -102,8 +102,8 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         updateCookie("-b", "ip", false);
 
         // start bookie to ensure everything works fine
-        ServerConfiguration conf = bsConfs.get(0);
-        BookieServer restartBookie = startBookie(conf);
+        ServerConfiguration conf = confByIndex(0);
+        BookieServer restartBookie = startAndAddBookie(conf).getServer();
         restartBookie.shutdown();
     }
 
@@ -113,7 +113,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
     @Test
     public void testUpdateCookieWithInvalidOption() throws Exception {
         String[] argv = new String[] { "updatecookie", "-b", "invalidBookieID" };
-        final ServerConfiguration conf = bsConfs.get(0);
+        final ServerConfiguration conf = confByIndex(0);
         updateCookie(argv, -1, conf);
 
         argv = new String[] { "updatecookie", "-b" };
@@ -143,7 +143,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
         updateCookie("-b", "hostname", true);
 
         // creates cookie with ipaddress
-        ServerConfiguration conf = bsConfs.get(0);
+        ServerConfiguration conf = confByIndex(0);
         conf.setUseHostNameAsBookieID(true); // sets to hostname
         Cookie cookie = Cookie.readFromRegistrationManager(rm, conf).getValue();
         Cookie.Builder cookieBuilder = Cookie.newBuilder(cookie);
@@ -175,15 +175,15 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
     @Test
     public void testDuplicateUpdateCookieIpAddress() throws Exception {
         String[] argv = new String[] { "updatecookie", "-b", "ip" };
-        final ServerConfiguration conf = bsConfs.get(0);
+        final ServerConfiguration conf = confByIndex(0);
         conf.setUseHostNameAsBookieID(true);
         updateCookie(argv, -1, conf);
     }
 
     @Test
     public void testWhenNoCookieExists() throws Exception {
-        ServerConfiguration conf = bsConfs.get(0);
-        BookieServer bks = bs.get(0);
+        ServerConfiguration conf = confByIndex(0);
+        BookieServer bks = serverByIndex(0);
         bks.shutdown();
 
         String zkCookiePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf)
@@ -213,8 +213,8 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
 
     private void updateCookie(String option, String optionVal, boolean useHostNameAsBookieID, boolean useShortHostName)
             throws Exception {
-        ServerConfiguration conf = new ServerConfiguration(bsConfs.get(0));
-        BookieServer bks = bs.get(0);
+        ServerConfiguration conf = new ServerConfiguration(confByIndex(0));
+        BookieServer bks = serverByIndex(0);
         bks.shutdown();
 
         conf.setUseHostNameAsBookieID(!useHostNameAsBookieID);
@@ -252,7 +252,7 @@ public class UpdateCookieCmdTest extends BookKeeperClusterTestCase {
 
     private void updateCookie(String[] argv, int exitCode, ServerConfiguration conf) throws KeeperException,
             InterruptedException, IOException, UnknownHostException, Exception {
-        BookieServer bks = bs.get(0);
+        BookieServer bks = serverByIndex(0);
         bks.shutdown();
 
         LOG.info("Perform updatecookie command");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index bb95286..335050d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -110,14 +110,12 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
         return jc;
     }
 
-    static File newV1JournalDirectory() throws Exception {
-        File d = IOUtils.createTempDir("bookie", "tmpdir");
+    static File initV1JournalDirectory(File d) throws Exception {
         writeJournal(d, 100, "foobar".getBytes()).close();
         return d;
     }
 
-    static File newV1LedgerDirectory() throws Exception {
-        File d = IOUtils.createTempDir("bookie", "tmpdir");
+    static File initV1LedgerDirectory(File d) throws Exception {
         writeLedgerDir(d, "foobar".getBytes());
         return d;
     }
@@ -138,15 +136,13 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
         }
     }
 
-    static File newV2JournalDirectory() throws Exception {
-        File d = newV1JournalDirectory();
-        createVersion2File(d);
+    static File initV2JournalDirectory(File d) throws Exception {
+        createVersion2File(initV1JournalDirectory(d));
         return d;
     }
 
-    static File newV2LedgerDirectory() throws Exception {
-        File d = newV1LedgerDirectory();
-        createVersion2File(d);
+    static File initV2LedgerDirectory(File d) throws Exception {
+        createVersion2File(initV1LedgerDirectory(d));
         return d;
     }
 
@@ -190,28 +186,22 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
 
     @Test
     public void testUpgradeV1toCurrent() throws Exception {
-        File journalDir = newV1JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV1LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV1JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV1LedgerDirectory(createTempDir("bookie", "ledger"));
         testUpgradeProceedure(zkUtil.getZooKeeperConnectString(), journalDir.getPath(), ledgerDir.getPath());
     }
 
     @Test
     public void testUpgradeV2toCurrent() throws Exception {
-        File journalDir = newV2JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV2LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV2JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV2LedgerDirectory(createTempDir("bookie", "ledger"));
         testUpgradeProceedure(zkUtil.getZooKeeperConnectString(), journalDir.getPath(), ledgerDir.getPath());
     }
 
     @Test
     public void testUpgradeCurrent() throws Exception {
-        File journalDir = newV2JournalDirectory();
-        tmpDirs.add(journalDir);
-        File ledgerDir = newV2LedgerDirectory();
-        tmpDirs.add(ledgerDir);
+        File journalDir = initV2JournalDirectory(createTempDir("bookie", "journal"));
+        File ledgerDir = initV2LedgerDirectory(createTempDir("bookie", "ledger"));
         testUpgradeProceedure(zkUtil.getZooKeeperConnectString(), journalDir.getPath(), ledgerDir.getPath());
 
         // Upgrade again
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
index 3baa1bb..2001441 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperAdminTest.java
@@ -122,10 +122,10 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
     }
 
     public void testTriggerAudit(boolean storeSystemTimeAsLedgerUnderreplicatedMarkTime) throws Exception {
-        ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
-        thisServerConf
-                .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
-        restartBookies(thisServerConf);
+        restartBookies(c -> {
+                c.setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
+                return c;
+            });
         ClientConfiguration thisClientConf = new ClientConfiguration(baseClientConf);
         thisClientConf
                 .setStoreSystemTimeAsLedgerUnderreplicatedMarkTime(storeSystemTimeAsLedgerUnderreplicatedMarkTime);
@@ -150,7 +150,7 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         ledgerHandle.addEntry(0, "data".getBytes());
         ledgerHandle.close();
 
-        BookieServer bookieToKill = bs.get(1);
+        BookieServer bookieToKill = serverByIndex(1);
         killBookie(1);
         /*
          * since lostBookieRecoveryDelay is set, when a bookie is died, it will
@@ -163,8 +163,8 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         assertTrue("There are supposed to be underreplicatedledgers", underreplicatedLedgerItr.hasNext());
         UnderreplicatedLedger underreplicatedLedger = underreplicatedLedgerItr.next();
         assertEquals("Underreplicated ledgerId", ledgerId, underreplicatedLedger.getLedgerId());
-        assertTrue("Missingreplica of Underreplicated ledgerId should contain " + bookieToKill.getBookieId(),
-                underreplicatedLedger.getReplicaList().contains(bookieToKill.getBookieId().toString()));
+        assertTrue("Missingreplica of Underreplicated ledgerId should contain " + bookieToKill,
+                underreplicatedLedger.getReplicaList().contains(bookieToKill.getBookieId().getId()));
         if (storeSystemTimeAsLedgerUnderreplicatedMarkTime) {
             long ctimeOfURL = underreplicatedLedger.getCtime();
             assertTrue("ctime of underreplicated ledger should be greater than test starttime",
@@ -179,7 +179,7 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
     @Test
     public void testBookieInit() throws Exception {
         int bookieindex = 0;
-        ServerConfiguration confOfExistingBookie = bsConfs.get(bookieindex);
+        ServerConfiguration confOfExistingBookie = confByIndex(bookieindex);
         Assert.assertFalse("initBookie shouldn't have succeeded, since bookie is still running with that configuration",
                 BookKeeperAdmin.initBookie(confOfExistingBookie));
         killBookie(bookieindex);
@@ -437,10 +437,11 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
     @Test
     public void testGetListOfEntriesOfNonExistingLedger() throws Exception {
         long nonExistingLedgerId = 56789L;
+
         try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
-            for (int i = 0; i < bs.size(); i++) {
+            for (int i = 0; i < bookieCount(); i++) {
                 CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
-                        .asyncGetListOfEntriesOfLedger(bs.get(i).getBookieId(), nonExistingLedgerId);
+                    .asyncGetListOfEntriesOfLedger(addressByIndex(i), nonExistingLedgerId);
                 try {
                     futureResult.get();
                     fail("asyncGetListOfEntriesOfLedger is supposed to be failed with NoSuchLedgerExistsException");
@@ -467,9 +468,9 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
             lh.close();
         }
         try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
-            for (int i = 0; i < bs.size(); i++) {
+            for (int i = 0; i < bookieCount(); i++) {
                 CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
-                        .asyncGetListOfEntriesOfLedger(bs.get(i).getBookieId(), lId);
+                    .asyncGetListOfEntriesOfLedger(addressByIndex(i), lId);
                 AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
                 assertEquals("Number of entries", numOfEntries,
                         availabilityOfEntriesOfLedger.getTotalNumOfAvailableEntries());
@@ -500,9 +501,9 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         }
 
         try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
-            for (int i = 0; i < bs.size(); i++) {
+            for (int i = 0; i < bookieCount(); i++) {
                 CompletableFuture<AvailabilityOfEntriesOfLedger> futureResult = bkAdmin
-                        .asyncGetListOfEntriesOfLedger(bs.get(i).getBookieId(), lId);
+                        .asyncGetListOfEntriesOfLedger(addressByIndex(i), lId);
                 AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = futureResult.get();
                 /*
                  * since num of bookies in the ensemble is 2 and
@@ -529,22 +530,22 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
 
         try (BookKeeperAdmin bkAdmin = new BookKeeperAdmin(zkUtil.getZooKeeperConnectString())) {
             Collection<BookieId> availableBookies = bkAdmin.getAvailableBookies();
-            Assert.assertEquals(availableBookies.size(), bs.size());
+            Assert.assertEquals(availableBookies.size(), bookieCount());
 
-            for (int i = 0; i < bs.size(); i++) {
-                availableBookies.contains(bs.get(i).getBookieId());
+            for (int i = 0; i < bookieCount(); i++) {
+                availableBookies.contains(addressByIndex(i));
             }
 
-            BookieServer killedBookie = bs.get(1);
+            BookieServer killedBookie = serverByIndex(1);
             killBookieAndWaitForZK(1);
 
             Collection<BookieId> remainingBookies = bkAdmin.getAvailableBookies();
-            Assert.assertFalse(remainingBookies.contains(killedBookie.getBookieId()));
+            Assert.assertFalse(remainingBookies.contains(killedBookie));
 
             Collection<BookieId> allBookies = bkAdmin.getAllBookies();
-            for (int i = 0; i < bs.size(); i++) {
-                remainingBookies.contains(bs.get(i).getBookieId());
-                allBookies.contains(bs.get(i).getBookieId());
+            for (int i = 0; i < bookieCount(); i++) {
+                remainingBookies.contains(addressByIndex(i));
+                allBookies.contains(addressByIndex(i));
             }
 
             Assert.assertEquals(remainingBookies.size(), allBookies.size() - 1);
@@ -575,7 +576,7 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
          * since no entry is added, callback is supposed to fail with
          * NoSuchLedgerExistsException.
          */
-        bkAdmin.asyncGetListOfEntriesOfLedger(bs.get(0).getBookieId(), lId)
+        bkAdmin.asyncGetListOfEntriesOfLedger(addressByIndex(0), lId)
                 .whenComplete((availabilityOfEntriesOfLedger, throwable) -> {
                     exceptionInCallback.set(throwable != null);
                     if (throwable != null) {
@@ -679,9 +680,9 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
         }
 
         ServerConfiguration bkConf = newServerConfiguration().setForceReadOnlyBookie(readonly);
-        BookieServer bkServer = startBookie(bkConf);
+        BookieServer bkServer = startBookie(bkConf).getServer();
 
-        String bookieId = bkServer.getBookieId().toString();
+        BookieId bookieId = bkServer.getBookieId();
         String host = bkServer.getLocalAddress().getHostName();
         int port = bkServer.getLocalAddress().getPort();
 
@@ -700,7 +701,7 @@ public class BookKeeperAdminTest extends BookKeeperClusterTestCase {
 
             assertThat(bookieServiceInfo.getEndpoints().size(), is(1));
             BookieServiceInfo.Endpoint endpoint = bookieServiceInfo.getEndpoints().stream()
-                    .filter(e -> Objects.equals(e.getId(), bookieId))
+                    .filter(e -> Objects.equals(e.getId(), bookieId.getId()))
                     .findFirst()
                     .get();
             assertNotNull("Endpoint " + bookieId + " not found.", endpoint);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index f614dcd..681f607 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -118,8 +118,7 @@ public class BookKeeperCloseTest extends BookKeeperClusterTestCase {
                     return super.readEntry(ledgerId, entryId);
                 }
             };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, delayBookie));
+        startAndAddBookie(conf, delayBookie);
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
index c7db87d..a13484f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperDiskSpaceWeightedLedgerPlacementTest.java
@@ -93,9 +93,7 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                 }
             }
         };
-        bsConfs.add(conf);
-        BookieServer server = startBookie(conf, bookieWithCustomFreeDiskSpace);
-        bs.add(server);
+        BookieServer server = startAndAddBookie(conf, bookieWithCustomFreeDiskSpace).getServer();
         client.blockUntilBookieWeightIs(server.getBookieId(), Optional.of(initialFreeDiskSpace));
         if (useFinal == null) {
             ready.set(true);
@@ -114,8 +112,8 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             BookKeeperCheckInfoReader client,
             BookieServer bookie, final long freeDiskSpace)
             throws Exception {
-        for (int i = 0; i < bs.size(); i++) {
-            if (bs.get(i).getBookieId().equals(bookie.getBookieId())) {
+        for (int i = 0; i < bookieCount(); i++) {
+            if (addressByIndex(i).equals(bookie.getBookieId())) {
                 return replaceBookieWithCustomFreeDiskSpaceBookie(client, i, freeDiskSpace);
             }
         }
@@ -126,7 +124,7 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
             BookKeeperCheckInfoReader client,
             int bookieIdx, long initialFreeDiskSpace,
              long finalFreeDiskSpace, AtomicBoolean useFinal) throws Exception {
-        BookieId addr = bs.get(bookieIdx).getBookieId();
+        BookieId addr = addressByIndex(bookieIdx);
         LOG.info("Killing bookie {}", addr);
         ServerConfiguration conf = killBookieAndWaitForZK(bookieIdx);
         client.blockUntilBookieWeightIs(addr, Optional.empty());
@@ -156,10 +154,8 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                 replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, multiple * freeDiskSpace);
             }
         }
-        Map<BookieId, Integer> m = new HashMap<BookieId, Integer>();
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        Map<BookieId, Integer> m = new HashMap<>();
+        bookieAddresses().forEach(a -> m.put(a, 0));
 
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
@@ -171,12 +167,12 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
         // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
         for (int i = 0; i < numBookies - 2; i++) {
-            double ratio1 = (double) m.get(bs.get(numBookies - 2).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio1 = (double) m.get(addressByIndex(numBookies - 2))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                     Math.abs(ratio1 - multiple) < 1);
-            double ratio2 = (double) m.get(bs.get(numBookies - 1).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio2 = (double) m.get(addressByIndex(numBookies - 1))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
                     Math.abs(ratio2 - multiple) < 1);
         }
@@ -206,10 +202,8 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                 replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, multiple * freeDiskSpace);
             }
         }
-        Map<BookieId, Integer> m = new HashMap<BookieId, Integer>();
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        Map<BookieId, Integer> m = new HashMap<>();
+        bookieAddresses().forEach(a -> m.put(a, 0));
 
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
@@ -221,31 +215,30 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
         // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
         for (int i = 0; i < numBookies - 2; i++) {
-            double ratio1 = (double) m.get(bs.get(numBookies - 2).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio1 = (double) m.get(addressByIndex(numBookies - 2))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                     Math.abs(ratio1 - multiple) < 1);
-            double ratio2 = (double) m.get(bs.get(numBookies - 1).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio2 = (double) m.get(addressByIndex(numBookies - 1))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
-                    Math.abs(ratio2 - multiple) < 1);
+            Math.abs(ratio2 - multiple) < 1);
         }
 
         // Restart the bookies in such a way that the first 2 bookies go from 1MB to 3MB free space and the last
         // 2 bookies go from 3MB to 1MB
-        BookieServer server1 = bs.get(0);
-        BookieServer server2 = bs.get(1);
-        BookieServer server3 = bs.get(numBookies - 2);
-        BookieServer server4 = bs.get(numBookies - 1);
+        BookieServer server1 = serverByIndex(0);
+        BookieServer server2 = serverByIndex(1);
+        BookieServer server3 = serverByIndex(numBookies - 2);
+        BookieServer server4 = serverByIndex(numBookies - 1);
 
         server1 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server1, multiple * freeDiskSpace);
         server2 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server2, multiple * freeDiskSpace);
         server3 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server3, freeDiskSpace);
         server4 = replaceBookieWithCustomFreeDiskSpaceBookie(client, server4, freeDiskSpace);
 
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        bookieAddresses().forEach(a -> m.put(a, 0));
+
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieId b : lh.getLedgerMetadata().getEnsembleAt(0)) {
@@ -256,18 +249,18 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
         // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
         for (int i = 0; i < numBookies; i++) {
-            if (server1.getBookieId().equals(bs.get(i).getBookieId())
-                    || server2.getBookieId().equals(bs.get(i).getBookieId())) {
+            if (server1.getLocalAddress().equals(addressByIndex(i))
+                    || server2.getLocalAddress().equals(addressByIndex(i))) {
                 continue;
             }
-            double ratio1 = (double) m.get(server1.getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio1 = (double) m.get(server1)
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                     Math.abs(ratio1 - multiple) < 1);
-            double ratio2 = (double) m.get(server2.getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio2 = (double) m.get(server2)
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
-                    Math.abs(ratio2 - multiple) < 1);
+            Math.abs(ratio2 - multiple) < 1);
         }
         client.close();
     }
@@ -296,10 +289,8 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                 replaceBookieWithCustomFreeDiskSpaceBookie(client, 0, multiple * freeDiskSpace);
             }
         }
-        Map<BookieId, Integer> m = new HashMap<BookieId, Integer>();
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        Map<BookieId, Integer> m = new HashMap<>();
+        bookieAddresses().forEach(a -> m.put(a, 0));
 
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
@@ -310,22 +301,21 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
 
         // make sure that bookies with higher weight are chosen 3X as often as the median;
         // since the number of ledgers is small (2000), there may be variation
-        double ratio1 = (double) m.get(bs.get(numBookies - 2).getBookieId())
-            / (double) m.get(bs.get(0).getBookieId());
+        double ratio1 = (double) m.get(addressByIndex(numBookies - 2))
+            / (double) m.get(addressByIndex(0));
         assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                 Math.abs(ratio1 - multiple) < 1);
-        double ratio2 = (double) m.get(bs.get(numBookies - 1).getBookieId())
-            / (double) m.get(bs.get(1).getBookieId());
+        double ratio2 = (double) m.get(addressByIndex(numBookies - 1))
+            / (double) m.get(addressByIndex(1));
         assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
-                Math.abs(ratio2 - multiple) < 1);
+        Math.abs(ratio2 - multiple) < 1);
 
         // Bring down the 2 bookies that had higher weight; after this the allocation to all
         // the remaining bookies should be uniform
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
-        BookieServer server1 = bs.get(numBookies - 2);
-        BookieServer server2 = bs.get(numBookies - 1);
+        bookieAddresses().forEach(a -> m.put(a, 0));
+
+        BookieServer server1 = serverByIndex(numBookies - 2);
+        BookieServer server2 = serverByIndex(numBookies - 1);
         killBookieAndWaitForZK(numBookies - 1);
         killBookieAndWaitForZK(numBookies - 2);
 
@@ -338,17 +328,17 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
 
         // make sure that bookies with higher weight are chosen 3X as often as the median;
         for (int i = 0; i < numBookies - 3; i++) {
-            double delta = Math.abs((double) m.get(bs.get(i).getBookieId())
-                    - (double) m.get(bs.get(i + 1).getBookieId()));
-            delta = (delta * 100) / (double) m.get(bs.get(i + 1).getBookieId());
+            double delta = Math.abs((double) m.get(addressByIndex(i))
+                    - (double) m.get(addressByIndex(i + 1)));
+            delta = (delta * 100) / (double) m.get(addressByIndex(i + 1));
             // the deviation should be less than 30%
             assertTrue("Weigheted placement is not honored: " + delta, delta <= 30);
         }
         // since the following 2 bookies were down, they shouldn't ever be selected
-        assertTrue("Weigheted placement is not honored" + m.get(server1.getBookieId()),
-                m.get(server1.getBookieId()) == 0);
-        assertTrue("Weigheted placement is not honored" + m.get(server2.getBookieId()),
-                m.get(server2.getBookieId()) == 0);
+        assertTrue("Weigheted placement is not honored" + m.get(server1),
+                m.get(server1) == 0);
+        assertTrue("Weigheted placement is not honored" + m.get(server2),
+                m.get(server2) == 0);
 
         client.close();
     }
@@ -376,10 +366,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // let the last two bookies be down initially
         ServerConfiguration conf1 = killBookieAndWaitForZK(numBookies - 1);
         ServerConfiguration conf2 = killBookieAndWaitForZK(numBookies - 2);
-        Map<BookieId, Integer> m = new HashMap<BookieId, Integer>();
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        Map<BookieId, Integer> m = new HashMap<>();
+
+        bookieAddresses().forEach(a -> m.put(a, 0));
 
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
@@ -391,9 +380,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight are chosen 3X as often as the median;
         // since the number of ledgers is small (2000), there may be variation
         for (int i = 0; i < numBookies - 3; i++) {
-            double delta = Math.abs((double) m.get(bs.get(i).getBookieId())
-                    - (double) m.get(bs.get(i + 1).getBookieId()));
-            delta = (delta * 100) / (double) m.get(bs.get(i + 1).getBookieId());
+            double delta = Math.abs((double) m.get(addressByIndex(i))
+                    - (double) m.get(addressByIndex(i + 1)));
+            delta = (delta * 100) / (double) m.get(addressByIndex(i + 1));
             // the deviation should be less than 30%
             assertTrue("Weigheted placement is not honored: " + delta, delta <= 30);
         }
@@ -402,9 +391,8 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         restartBookie(client, conf1, multiple * freeDiskSpace, multiple * freeDiskSpace, null);
         restartBookie(client, conf2, multiple * freeDiskSpace, multiple * freeDiskSpace, null);
 
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        bookieAddresses().forEach(a -> m.put(a, 0));
+
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieId b : lh.getLedgerMetadata().getEnsembleAt(0)) {
@@ -415,12 +403,12 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
         // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
         for (int i = 0; i < numBookies - 2; i++) {
-            double ratio1 = (double) m.get(bs.get(numBookies - 2).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio1 = (double) m.get(addressByIndex(numBookies - 2))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                     Math.abs(ratio1 - multiple) < 1);
-            double ratio2 = (double) m.get(bs.get(numBookies - 1).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio2 = (double) m.get(addressByIndex(numBookies - 1))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
                     Math.abs(ratio2 - multiple) < 1);
         }
@@ -456,10 +444,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
                         client, 0, freeDiskSpace, multiple * freeDiskSpace, useHigherValue);
             }
         }
-        Map<BookieId, Integer> m = new HashMap<BookieId, Integer>();
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        Map<BookieId, Integer> m = new HashMap<>();
+
+        bookieAddresses().forEach(a -> m.put(a, 0));
 
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
@@ -469,9 +456,9 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         }
 
         for (int i = 0; i < numBookies - 1; i++) {
-            double delta = Math.abs((double) m.get(bs.get(i).getBookieId())
-                    - (double) m.get(bs.get(i + 1).getBookieId()));
-            delta = (delta * 100) / (double) m.get(bs.get(i + 1).getBookieId());
+            double delta = Math.abs((double) m.get(addressByIndex(i))
+                    - (double) m.get(addressByIndex(i + 1)));
+            delta = (delta * 100) / (double) m.get(addressByIndex(i + 1));
             assertTrue("Weigheted placement is not honored: " + delta, delta <= 30); // the deviation should be <30%
         }
 
@@ -481,15 +468,13 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         Thread.sleep(updateIntervalSecs * 1000);
         for (int i = 0; i < numBookies; i++) {
             if (i < numBookies - 2) {
-                client.blockUntilBookieWeightIs(bs.get(i).getBookieId(), Optional.of(freeDiskSpace));
+                client.blockUntilBookieWeightIs(addressByIndex(i), Optional.of(freeDiskSpace));
             } else {
-                client.blockUntilBookieWeightIs(bs.get(i).getBookieId(), Optional.of(freeDiskSpace * multiple));
+                client.blockUntilBookieWeightIs(addressByIndex(i), Optional.of(freeDiskSpace * multiple));
             }
         }
 
-        for (BookieServer b : bs) {
-            m.put(b.getBookieId(), 0);
-        }
+        bookieAddresses().forEach(a -> m.put(a, 0));
         for (int i = 0; i < 2000; i++) {
             LedgerHandle lh = client.createLedger(3, 3, DigestType.CRC32, "testPasswd".getBytes());
             for (BookieId b : lh.getLedgerMetadata().getEnsembleAt(0)) {
@@ -500,12 +485,12 @@ public class BookKeeperDiskSpaceWeightedLedgerPlacementTest extends BookKeeperCl
         // make sure that bookies with higher weight(the last 2 bookies) are chosen 3X as often as the median;
         // since the number of ledgers created is small (2000), we allow a range of 2X to 4X instead of the exact 3X
         for (int i = 0; i < numBookies - 2; i++) {
-            double ratio1 = (double) m.get(bs.get(numBookies - 2).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio1 = (double) m.get(addressByIndex(numBookies - 2))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio1 - multiple),
                     Math.abs(ratio1 - multiple) < 1);
-            double ratio2 = (double) m.get(bs.get(numBookies - 1).getBookieId())
-                / (double) m.get(bs.get(i).getBookieId());
+            double ratio2 = (double) m.get(addressByIndex(lastBookieIndex()))
+                / (double) m.get(addressByIndex(i));
             assertTrue("Weigheted placement is not honored: " + Math.abs(ratio2 - multiple),
                     Math.abs(ratio2 - multiple) < 1);
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index d1ad5fd..613a19d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -30,9 +30,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import io.netty.util.IllegalReferenceCountException;
-
 import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Collections;
@@ -44,7 +42,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
 import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
@@ -54,7 +51,6 @@ import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -1037,10 +1033,10 @@ public class BookKeeperTest extends BookKeeperClusterTestCase {
 
             // Put all non ensemble bookies to sleep
             LOG.info("Putting all non ensemble bookies to sleep.");
-            for (BookieServer bookieServer : bs) {
+            for (BookieId addr : bookieAddresses()) {
                 try {
-                    if (!lh.getCurrentEnsemble().contains(bookieServer.getBookieId())) {
-                        sleepBookie(bookieServer.getBookieId(), sleepLatchCase2);
+                    if (!lh.getCurrentEnsemble().contains(addr)) {
+                        sleepBookie(addr, sleepLatchCase2);
                     }
                 } catch (UnknownHostException ignored) {}
             }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
index 020fabb..d395ddf 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieDecommissionTest.java
@@ -75,7 +75,7 @@ public class BookieDecommissionTest extends BookKeeperClusterTestCase {
              * if we try to call decommissionBookie for a bookie which is not
              * shutdown, then it should throw BKIllegalOpException
              */
-            bkAdmin.decommissionBookie(bs.get(0).getBookieId());
+            bkAdmin.decommissionBookie(addressByIndex(0));
             fail("Expected BKIllegalOpException because that bookie is not shutdown yet");
         } catch (BKIllegalOpException bkioexc) {
             // expected IllegalException
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java
index 977eb8c..094b2ed 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieNetworkAddressChangeTest.java
@@ -22,7 +22,6 @@ package org.apache.bookkeeper.client;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
 import org.apache.bookkeeper.client.api.BookKeeper;
@@ -30,10 +29,9 @@ import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.api.WriteHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
-import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.ZKRegistrationClient;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.PortManager;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -68,9 +66,7 @@ public class BookieNetworkAddressChangeTest extends BookKeeperClusterTestCase {
 
             // restart bookie, change port
             // on metadata we have a bookieId, not the network address
-            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
-            thisServerConf.setBookiePort(PortManager.nextFreePort());
-            restartBookies(thisServerConf);
+            restartBookies(c -> c);
 
             try (ReadHandle h = bkc
                     .newOpenLedgerOp()
@@ -88,6 +84,7 @@ public class BookieNetworkAddressChangeTest extends BookKeeperClusterTestCase {
     }
 
     @Test
+    @Ignore("PLSR-1850 Seems like restart of the bookie always comes up on same port hence failing this test")
     public void testFollowBookieAddressChangeTrckingDisabled() throws Exception {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
@@ -109,10 +106,7 @@ public class BookieNetworkAddressChangeTest extends BookKeeperClusterTestCase {
 
             // restart bookie, change port
             // on metadata we have a bookieId, not the network address
-            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
-            thisServerConf.setBookiePort(PortManager.nextFreePort());
-            restartBookies(thisServerConf);
-
+            restartBookie(getBookie(0));
             try (ReadHandle h = bkc
                     .newOpenLedgerOp()
                     .withLedgerId(lId)
@@ -157,9 +151,7 @@ public class BookieNetworkAddressChangeTest extends BookKeeperClusterTestCase {
 
             // restart bookie, change port
             // on metadata we have a bookieId, not the network address
-            ServerConfiguration thisServerConf = new ServerConfiguration(baseConf);
-            thisServerConf.setBookiePort(PortManager.nextFreePort());
-            restartBookies(thisServerConf);
+            restartBookies(c -> c);
 
             try (ReadHandle h = bkc
                     .newOpenLedgerOp()
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
index ce66c9f..64ba419 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryTest.java
@@ -278,8 +278,8 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
             lh.addEntry(data);
         }
         // start the killed bookie again
-        bsConfs.add(confOfKilledBookie);
-        bs.add(startBookie(confOfKilledBookie));
+        startAndAddBookie(confOfKilledBookie);
+
         // all ensembles should be fully replicated since it is recovered
         assertTrue("Not fully replicated", verifyFullyReplicated(lh, 3 * numEntries));
         lh.close();
@@ -306,9 +306,8 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
 
         // Startup a new bookie server
         startNewBookie();
@@ -356,9 +355,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
 
         // Startup three new bookie servers
         for (int i = 0; i < 3; i++) {
@@ -409,15 +408,15 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
 
         // Startup a new bookie server
         int newBookiePort = startNewBookie();
 
         // Write some more entries for the ledgers so a new ensemble will be
-        // created for them.
+        //created for them.
         writeEntriestoLedgers(numMsgs, 10, lhs);
 
         // Call the sync recover bookie method.
@@ -450,9 +449,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
 
         // Startup three new bookie servers
         for (int i = 0; i < 3; i++) {
@@ -691,8 +690,7 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         }
 
         // restart failed bookie
-        bs.add(startBookie(conf2));
-        bsConfs.add(conf2);
+        startAndAddBookie(conf2);
 
         // recover them
         bkAdmin.recoverBookieData(bookieToKill);
@@ -725,9 +723,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
 
         // Call the async recover bookie method.
         LOG.info("Now recover the data on the killed bookie (" + bookieSrc
@@ -756,10 +754,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
 
         // Shutdown the first bookie server
         LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
-        int removeIndex = r.nextInt(bs.size());
-        BookieId bookieSrc = bs.get(removeIndex).getBookieId();
-        bs.get(removeIndex).shutdown();
-        bs.remove(removeIndex);
+        int removeIndex = r.nextInt(bookieCount());
+        BookieId bookieSrc = addressByIndex(removeIndex);
+        killBookie(removeIndex);
 
         // Startup new bookie server
         startNewBookie();
@@ -799,9 +796,9 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         }
         lh.close();
 
-        BookieId bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+        BookieId bookieSrc = addressByIndex(0);
+        killBookie(0);
+
         startNewBookie();
 
         // Check that entries are missing
@@ -826,9 +823,8 @@ public class BookieRecoveryTest extends BookKeeperClusterTestCase {
         assertTrue("Should be back to fully replication", verifyFullyReplicated(lh, 100));
         lh.close();
 
-        bookieSrc = bs.get(0).getBookieId();
-        bs.get(0).shutdown();
-        bs.remove(0);
+        bookieSrc = addressByIndex(0);
+        killBookie(0);
         startNewBookie();
 
         // Check that entries are missing
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
index 128bef4..e66129c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java
@@ -242,8 +242,7 @@ public class BookieWriteLedgerTest extends
         // Replace the bookie with a fake bookie
         ServerConfiguration conf = killBookie(ensemble.get(0));
         BookieWriteLedgerTest.CorruptReadBookie corruptBookie = new BookieWriteLedgerTest.CorruptReadBookie(conf);
-        bs.add(startBookie(conf, corruptBookie));
-        bsConfs.add(conf);
+        startAndAddBookie(conf, corruptBookie);
 
         i = numEntriesToWrite;
         numEntriesToWrite = numEntriesToWrite + 50;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index 8a0a1c2..5467a09 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -215,8 +215,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
                 throw new IOException("Dead bookie for recovery adds.");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, sBookie));
+        startAndAddBookie(conf, sBookie);
     }
 
     // simulate slow adds, then become normal when recover,
@@ -235,8 +234,7 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
                 throw new IOException("Dead bookie");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, dBookie));
+        startAndAddBookie(conf, dBookie);
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
index 83d9310..c77607e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCmdTest.java
@@ -21,8 +21,6 @@
 package org.apache.bookkeeper.client;
 
 import static junit.framework.TestCase.assertEquals;
-
-import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -67,16 +65,12 @@ public class LedgerCmdTest extends BookKeeperClusterTestCase {
         LOG.info("Create ledger and add entries to it");
         LedgerHandle lh1 = createLedgerWithEntries(bk, 10);
 
-        bs.forEach(bookieServer -> {
-            try {
-                BookieAccessor.forceFlush((BookieImpl) bookieServer.getBookie());
-            } catch (IOException e) {
-                LOG.error("Error forceFlush:", e);
-            }
-        });
+        for (int i = 0; i < bookieCount(); i++) {
+                BookieAccessor.forceFlush((BookieImpl) serverByIndex(i).getBookie());
+        }
 
         String[] argv = { "ledger", Long.toString(lh1.getId()) };
-        final ServerConfiguration conf = bsConfs.get(0);
+        final ServerConfiguration conf = confByIndex(0);
         conf.setUseHostNameAsBookieID(true);
 
         BookieShell bkShell =
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
index e8f83d1..59a7f50 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java
@@ -143,8 +143,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         }
 
         // shutdown first bookie server
-        bs.get(0).shutdown();
-        bs.remove(0);
+        killBookie(0);
 
         /*
          * Try to open ledger.
@@ -196,8 +195,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
                 // drop request to simulate a slow and failed bookie
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, fakeBookie));
+        startAndAddBookie(conf, fakeBookie);
 
         // avoid not-enough-bookies case
         startNewBookie();
@@ -209,9 +207,8 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         }
 
         conf = killBookie(host);
-        bsConfs.add(conf);
         // the bookie goes normally
-        bs.add(startBookie(conf));
+        startAndAddBookie(conf);
 
         /*
          * Try to open ledger.
@@ -256,8 +253,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
                 throw new IOException("Couldn't write for some reason");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, deadBookie1));
+        startAndAddBookie(conf, deadBookie1);
 
         // kill first bookie server
         BookieId bookie1 = lhbefore.getCurrentEnsemble().get(0);
@@ -273,8 +269,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         }
 
         // restart the first server, kill the second
-        bsConfs.add(conf1);
-        bs.add(startBookie(conf1));
+        startAndAddBookie(conf1);
         BookieId bookie2 = lhbefore.getCurrentEnsemble().get(1);
         ServerConfiguration conf2 = killBookie(bookie2);
 
@@ -298,8 +293,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
         assertTrue("Open call should have completed", openLatch.await(5, TimeUnit.SECONDS));
         assertFalse("Open should not have succeeded", returnCode.get() == BKException.Code.OK);
 
-        bsConfs.add(conf2);
-        bs.add(startBookie(conf2));
+        startAndAddBookie(conf2);
 
         LedgerHandle lhafter = bkc.openLedger(lhbefore.getId(), digestType,
                 "".getBytes());
@@ -337,8 +331,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
                 throw new IOException("Couldn't write for some reason");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, deadBookie1));
+        startAndAddBookie(conf, deadBookie1);
 
         // kill first bookie server
         BookieId bookie1 = lhbefore.getCurrentEnsemble().get(0);
@@ -420,8 +413,7 @@ public class LedgerRecoveryTest extends BookKeeperClusterTestCase {
                 throw new IOException("Couldn't write entries for some reason");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, rBookie));
+        startAndAddBookie(conf, rBookie);
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
index 8ec887e..54cfaa5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MdcContextTest.java
@@ -181,8 +181,8 @@ public class MdcContextTest extends BookKeeperClusterTestCase {
     @Test
     public void testAddFailsWithReadOnlyBookie() throws Exception {
         for (int i = 0; i < 3; ++i) {
-            Bookie bookie = bs.get(i).getBookie();
-            File[] ledgerDirs = bsConfs.get(i).getLedgerDirs();
+            Bookie bookie = serverByIndex(i).getBookie();
+            File[] ledgerDirs = confByIndex(i).getLedgerDirs();
             LedgerDirsManager ledgerDirsManager = ((BookieImpl) bookie).getLedgerDirsManager();
             ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index ef4d26b..ae7ce9f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -595,8 +595,7 @@ public class ParallelLedgerRecoveryTest extends BookKeeperClusterTestCase {
         ServerConfiguration conf = killBookie(address);
         conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
         DelayResponseBookie fakeBookie = new DelayResponseBookie(conf);
-        bs.add(startBookie(conf, fakeBookie));
-        bsConfs.add(conf);
+        startAndAddBookie(conf, fakeBookie);
 
         // 1) bk0 write two entries
         lh0.addEntry("entry-0".getBytes(UTF_8));
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index 93077af..9b5853a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -90,8 +90,8 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
         Assert.assertEquals("There should be no read only bookies initially.",
                 Collections.emptySet(), readonlyBookies1);
 
-        BookieId bookieId0 = bs.get(0).getBookieId();
-        BookieId bookieId1 = bs.get(1).getBookieId();
+        BookieId bookieId0 = getBookie(0);
+        BookieId bookieId1 = getBookie(1);
 
         boolean isUnavailable1 = bookieWatcher.isBookieUnavailable(bookieId0);
         Assert.assertFalse("The bookie should not be unavailable.", isUnavailable1);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
index 996c040..eb83573 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDelayEnsembleChange.java
@@ -28,20 +28,18 @@ import static org.apache.bookkeeper.client.BookKeeperClientStats.LEDGER_ENSEMBLE
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-
 import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -158,10 +156,8 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         assertEquals("There should be no ensemble change if delaying ensemble change is enabled.",
                      1, lh.getLedgerMetadata().getAllEnsembles().size());
 
-        bsConfs.add(conf0);
-        bs.add(startBookie(conf0));
-        bsConfs.add(conf1);
-        bs.add(startBookie(conf1));
+        startAndAddBookie(conf0);
+        startAndAddBookie(conf1);
 
         for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
             lh.addEntry(data);
@@ -194,11 +190,15 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         }
 
         for (BookieId addr : lh.getLedgerMetadata().getAllEnsembles().get(0L)) {
+            StringBuilder nameBuilder = new StringBuilder(CLIENT_SCOPE);
+            nameBuilder.append('.').
+                    append("bookie_").
+                    append(TestUtils.buildStatsCounterPathFromBookieID(addr)).
+                    append('.').
+                    append(LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION);
             assertTrue(
                     LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION + " should be > 0 for " + addr,
-                    bkc.getTestStatsProvider().getCounter(
-                            CLIENT_SCOPE + ".bookie_" + addr.toString().replace('-', '_')
-                                    + "." + LEDGER_ENSEMBLE_BOOKIE_DISTRIBUTION)
+                    bkc.getTestStatsProvider().getCounter(nameBuilder.toString())
                             .get() > 0);
         }
         assertTrue(
@@ -267,12 +267,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         assertEquals(firstFragment.get(3), secondFragment.get(3));
         assertEquals(firstFragment.get(4), secondFragment.get(4));
 
-        bsConfs.add(conf0);
-        bs.add(startBookie(conf0));
-        bsConfs.add(conf1);
-        bs.add(startBookie(conf1));
-        bsConfs.add(conf2);
-        bs.add(startBookie(conf2));
+        startAndAddBookie(conf0);
+        startAndAddBookie(conf1);
+        startAndAddBookie(conf2);
 
         for (int i = 4 * numEntries; i < 5 * numEntries; i++) {
             lh.addEntry(data);
@@ -320,12 +317,9 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
         assertEquals("There should be ensemble change if ack quorum is broken.",
                      2, lh.getLedgerMetadata().getAllEnsembles().size());
 
-        bsConfs.add(conf0);
-        bs.add(startBookie(conf0));
-        bsConfs.add(conf1);
-        bs.add(startBookie(conf1));
-        bsConfs.add(conf2);
-        bs.add(startBookie(conf2));
+        startAndAddBookie(conf0);
+        startAndAddBookie(conf1);
+        startAndAddBookie(conf2);
 
         for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
             lh.addEntry(data);
@@ -377,8 +371,7 @@ public class TestDelayEnsembleChange extends BookKeeperClusterTestCase {
                      2, lh.getLedgerMetadata().getAllEnsembles().size());
 
         for (ServerConfiguration conf : confs) {
-            bsConfs.add(conf);
-            bs.add(startBookie(conf));
+            startAndAddBookie(conf);
         }
 
         for (int i = 2 * numEntries; i < 3 * numEntries; i++) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
index 9a3c41e..ad34d0f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestDisableEnsembleChange.java
@@ -209,8 +209,7 @@ public class TestDisableEnsembleChange extends BookKeeperClusterTestCase {
                 addLatch.await(1000, TimeUnit.MILLISECONDS));
         assertEquals(res.get(), 0xdeadbeef);
         // start the original bookie
-        bsConfs.add(killedConf);
-        bs.add(startBookie(killedConf));
+        startAndAddBookie(killedConf);
         assertTrue("Add entry operation should complete at this point.",
                 addLatch.await(10000, TimeUnit.MILLISECONDS));
         assertEquals(res.get(), BKException.Code.OK);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
index f60161d..40b6c43 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedAndEntry.java
@@ -123,8 +123,7 @@ public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
         for (int i = 0; i < numBookies; i++) {
             ServerConfiguration conf = newServerConfiguration();
             Bookie b = new FakeBookie(conf, expectedEntryIdToFail, i != 0);
-            bs.add(startBookie(conf, b));
-            bsConfs.add(conf);
+            startAndAddBookie(conf, b);
         }
 
         // create bookkeeper
@@ -240,9 +239,7 @@ public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
         ServerConfiguration bsConf = killBookie(0);
         // start it with a slow bookie
         Bookie b = new SlowReadLacBookie(bsConf, lacToSlowRead, readLatch);
-        bs.add(startBookie(bsConf, b));
-        bsConfs.add(bsConf);
-
+        startAndAddBookie(bsConf, b);
         // create bookkeeper
         BookKeeper newBk = new BookKeeper(newConf);
         // create ledger
@@ -289,4 +286,4 @@ public class TestReadLastConfirmedAndEntry extends BookKeeperClusterTestCase {
 
         newBk.close();
     }
-}
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
index c4ec8f7..54406bc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestReadLastConfirmedLongPoll.java
@@ -189,8 +189,7 @@ public class TestReadLastConfirmedLongPoll extends BookKeeperClusterTestCase {
 
             // start the bookies
             for (ServerConfiguration conf : confs) {
-                bs.add(startBookie(conf));
-                bsConfs.add(conf);
+                startAndAddBookie(conf);
             }
         }
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
index a4a63fa..4024e2a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestTryReadLastConfirmed.java
@@ -144,8 +144,7 @@ public class TestTryReadLastConfirmed extends BookKeeperClusterTestCase {
 
             // start the bookies
             for (ServerConfiguration conf : confs) {
-                bs.add(startBookie(conf));
-                bsConfs.add(conf);
+                startAndAddBookie(conf);
             }
         }
         lh.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
index 62286e1..e514121 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerCmdTest.java
@@ -71,7 +71,7 @@ public class UpdateLedgerCmdTest extends BookKeeperClusterTestCase {
         }
 
         String[] argv = new String[] { "updateledgers", "-b", "hostname", "-v", "true", "-p", "2" };
-        final ServerConfiguration conf = bsConfs.get(0);
+        final ServerConfiguration conf = confByIndex(0);
         conf.setUseHostNameAsBookieID(true);
         BookieSocketAddress toBookieId = BookieImpl.getBookieAddress(conf);
         BookieId toBookieAddr = new BookieSocketAddress(toBookieId.getHostName() + ":"
@@ -95,12 +95,12 @@ public class UpdateLedgerCmdTest extends BookKeeperClusterTestCase {
         for (int i = 1; i < 40; i++) {
             ledgers.add(createLedgerWithEntries(bk, 0));
         }
-        BookieId srcBookie = bs.get(0).getBookieId();
+        BookieId srcBookie = getBookie(0);
         BookieId destBookie = new BookieSocketAddress("1.1.1.1", 2181).toBookieId();
         String[] argv = new String[] { "updateBookieInLedger", "-sb", srcBookie.toString(), "-db",
                 destBookie.toString(), "-v", "true", "-p", "2" };
-        final ServerConfiguration conf = bsConfs.get(0);
-        bs.get(0).shutdown();
+        final ServerConfiguration conf = confByIndex(0);
+        serverByIndex(0).shutdown();
         updateLedgerCmd(argv, 0, conf);
         int updatedLedgersCount = getUpdatedLedgersCount(bk, ledgers, srcBookie);
         assertEquals("Failed to update the ledger metadata with new bookie-address", 0, updatedLedgersCount);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
index 52cbd7e..adc0a14 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/UpdateLedgerOpTest.java
@@ -195,7 +195,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
             LOG.info("Create ledger and add entries to it");
             LedgerHandle lh = createLedgerWithEntries(bk, 100);
 
-            BookieServer bookieServer = bs.get(0);
+            BookieServer bookieServer = serverByIndex(0);
             List<BookieId> ensemble = lh.getLedgerMetadata().getEnsembleAt(0);
             BookieSocketAddress curBookieAddr = null;
             for (BookieId bookieSocketAddress : ensemble) {
@@ -216,8 +216,7 @@ public class UpdateLedgerOpTest extends BookKeeperClusterTestCase {
             bookieServer.shutdown();
 
             ServerConfiguration serverConf1 = newServerConfiguration();
-            bsConfs.add(serverConf1);
-            bs.add(startBookie(serverConf1));
+            startAndAddBookie(serverConf1);
 
             final CountDownLatch latch = new CountDownLatch(1);
             final AtomicInteger rc = new AtomicInteger(BKException.Code.OK);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
index 4b13ebe..9b49d33 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureTest.java
@@ -24,9 +24,7 @@ package org.apache.bookkeeper.proto;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-
 import io.netty.buffer.UnpooledByteBufAllocator;
-
 import java.lang.reflect.Field;
 import java.nio.channels.FileChannel;
 import java.util.Enumeration;
@@ -34,8 +32,8 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.bookie.Bookie;
+import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.bookie.Journal;
 import org.apache.bookkeeper.bookie.SlowBufferedChannel;
 import org.apache.bookkeeper.bookie.SlowInterleavedLedgerStorage;
@@ -47,13 +45,12 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,9 +120,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
         }
     }
 
-    private void mockJournal(Bookie bookie, long getDelay, long addDelay, long flushDelay) throws Exception {
+    private Bookie bookieWithMockedJournal(ServerConfiguration conf,
+                                           long getDelay, long addDelay, long flushDelay) throws Exception {
+        Bookie bookie = new BookieImpl(conf);
         if (getDelay <= 0 && addDelay <= 0 && flushDelay <= 0) {
-            return;
+            return bookie;
         }
 
         List<Journal> journals = getJournals(bookie);
@@ -141,6 +140,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
 
             journals.set(i, mock);
         }
+        return bookie;
     }
 
     @SuppressWarnings("unchecked")
@@ -154,7 +154,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteNoBackpressureSlowJournal() throws Exception {
         //disable backpressure for writes
-        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        confByIndex(0).setMaxAddsInProgressLimit(0);
         addDelay = 1;
 
         doWritesNoBackpressure(0);
@@ -163,9 +163,9 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteNoBackpressureSlowJournalFlush() throws Exception {
         //disable backpressure for writes
-        bsConfs.get(0).setMaxAddsInProgressLimit(0);
+        confByIndex(0).setMaxAddsInProgressLimit(0);
         // to increase frequency of flushes
-        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        confByIndex(0).setJournalAdaptiveGroupWrites(false);
         flushDelay = 1;
 
         doWritesNoBackpressure(0);
@@ -174,7 +174,7 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteWithBackpressureSlowJournal() throws Exception {
         //enable backpressure with MAX_PENDING writes in progress
-        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setMaxAddsInProgressLimit(MAX_PENDING);
         flushDelay = 1;
 
         doWritesWithBackpressure(0);
@@ -184,9 +184,9 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteWithBackpressureSlowJournalFlush() throws Exception {
         //enable backpressure with MAX_PENDING writes in progress
-        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setMaxAddsInProgressLimit(MAX_PENDING);
         // to increase frequency of flushes
-        bsConfs.get(0).setJournalAdaptiveGroupWrites(false);
+        confByIndex(0).setJournalAdaptiveGroupWrites(false);
         flushDelay = 1;
 
         doWritesWithBackpressure(0);
@@ -195,11 +195,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteNoBackpressureSlowInterleavedStorage() throws Exception {
         //disable backpressure for writes
-        bsConfs.get(0).setMaxAddsInProgressLimit(0);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(0);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
 
         doWritesNoBackpressure(0);
     }
@@ -207,11 +207,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteWithBackpressureSlowInterleavedStorage() throws Exception {
         //enable backpressure with MAX_PENDING writes in progress
-        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
 
         doWritesWithBackpressure(0);
     }
@@ -219,11 +219,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteNoBackpressureSlowInterleavedStorageFlush() throws Exception {
         //disable backpressure for writes
-        bsConfs.get(0).setMaxAddsInProgressLimit(0);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(0);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
 
         doWritesNoBackpressure(0);
     }
@@ -231,11 +231,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteWithBackpressureSlowInterleavedStorageFlush() throws Exception {
         //enable backpressure with MAX_PENDING writes in progress
-        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
 
         doWritesWithBackpressure(0);
     }
@@ -243,16 +243,16 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteNoBackpressureSortedStorage() throws Exception {
         //disable backpressure for writes
-        bsConfs.get(0).setMaxAddsInProgressLimit(0);
-        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(0);
+        confByIndex(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
         // one for memtable being flushed, one for the part accepting the data
         assertTrue("for the test, memtable should not keep more entries than allowed",
                 ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
-        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+        confByIndex(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
 
         doWritesNoBackpressure(0);
     }
@@ -260,16 +260,16 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testWriteWithBackpressureSortedStorage() throws Exception {
         //enable backpressure with MAX_PENDING writes in progress
-        bsConfs.get(0).setMaxAddsInProgressLimit(MAX_PENDING);
-        bsConfs.get(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxAddsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setLedgerStorageClass(SlowSortedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
         // one for memtable being flushed, one for the part accepting the data
         assertTrue("for the test, memtable should not keep more entries than allowed",
                 ENTRIES_IN_MEMTABLE * 2 <= MAX_PENDING);
-        bsConfs.get(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
+        confByIndex(0).setSkipListSizeLimit(data.length * ENTRIES_IN_MEMTABLE - 1);
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_ADD_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_FLUSH_DELAY, "10");
 
         doWritesWithBackpressure(0);
     }
@@ -277,11 +277,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     @Test
     public void testReadsNoBackpressure() throws Exception {
         //disable backpressure for reads
-        bsConfs.get(0).setMaxReadsInProgressLimit(0);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxReadsInProgressLimit(0);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
 
         final BookieRequestProcessor brp = generateDataAndDoReads(0);
 
@@ -292,11 +292,11 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
    @Test
     public void testReadsWithBackpressure() throws Exception {
         //enable backpressure for reads
-        bsConfs.get(0).setMaxReadsInProgressLimit(MAX_PENDING);
-        bsConfs.get(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
-        bsConfs.get(0).setWriteBufferBytes(data.length);
+        confByIndex(0).setMaxReadsInProgressLimit(MAX_PENDING);
+        confByIndex(0).setLedgerStorageClass(SlowInterleavedLedgerStorage.class.getName());
+        confByIndex(0).setWriteBufferBytes(data.length);
 
-        bsConfs.get(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
+        confByIndex(0).setProperty(SlowInterleavedLedgerStorage.PROP_SLOW_STORAGE_GET_DELAY, "1");
 
         final BookieRequestProcessor brp = generateDataAndDoReads(0);
 
@@ -305,12 +305,12 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     }
 
     private BookieRequestProcessor generateDataAndDoReads(final int bkId) throws Exception {
-        BookieServer bks = bs.get(bkId);
-        bks.shutdown();
-        bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
-        bks.start();
-        bs.set(bkId, bks);
+        Assert.assertThat("should be only one bookie",
+                          bookieCount(), Matchers.equalTo(1));
+        ServerConfiguration conf = killBookie(0);
+        BookieServer bks = startAndAddBookie(conf,
+                                             bookieWithMockedJournal(conf, getDelay, addDelay, flushDelay))
+            .getServer();
 
         LOG.info("creating ledgers");
         // Create ledgers
@@ -346,12 +346,12 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     // here we expect that backpressure is disabled and number of writes in progress
     // will exceed the limit
     private void doWritesNoBackpressure(final int bkId) throws Exception {
-        BookieServer bks = bs.get(bkId);
-        bks.shutdown();
-        bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
-        bks.start();
-        bs.set(bkId, bks);
+        Assert.assertThat("should be only one bookie",
+                          bookieCount(), Matchers.equalTo(1));
+        ServerConfiguration conf = killBookie(0);
+        BookieServer bks = startAndAddBookie(conf,
+                                             bookieWithMockedJournal(conf, getDelay, addDelay, flushDelay))
+            .getServer();
 
         LOG.info("Creating ledgers");
         LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
@@ -391,12 +391,12 @@ public class BookieBackpressureTest extends BookKeeperClusterTestCase
     // here we expect that backpressure is enabled and number of writes in progress
     // will never exceed the limit
     private void doWritesWithBackpressure(final int bkId) throws Exception {
-        BookieServer bks = bs.get(bkId);
-        bks.shutdown();
-        bks = new BookieServer(bsConfs.get(bkId));
-        mockJournal(bks.getBookie(), getDelay, addDelay, flushDelay);
-        bks.start();
-        bs.set(bkId, bks);
+        Assert.assertThat("should be only one bookie",
+                          bookieCount(), Matchers.equalTo(1));
+        ServerConfiguration conf = killBookie(0);
+        BookieServer bks = startAndAddBookie(conf,
+                                             bookieWithMockedJournal(conf, getDelay, addDelay, flushDelay))
+            .getServer();
 
         LOG.info("Creating ledgers");
         LedgerHandle[] lhs = new LedgerHandle[NUM_OF_LEDGERS];
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
index 0c3e7b5..0d1234f 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/NetworkLessBookieTest.java
@@ -56,8 +56,8 @@ public class NetworkLessBookieTest extends BookKeeperClusterTestCase {
             }
         }
 
-        for (BookieServer bk : bs) {
-            for (Channel channel : bk.nettyServer.allChannels) {
+        for (int i = 0; i < bookieCount(); i++) {
+            for (Channel channel : serverByIndex(i).nettyServer.allChannels) {
                 if (!(channel instanceof LocalChannel)) {
                     fail();
                 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
index 39e72ea..0888f7a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java
@@ -173,10 +173,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase {
 
     // copy from TestAuth
     BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
-        bsConfs.add(conf);
-        BookieServer s = startBookie(conf);
-        bs.add(s);
-        return s;
+        return startAndAddBookie(conf).getServer();
     }
 
     CompatClient42 newCompatClient(BookieId addr) throws Exception {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
index 17eb474..3abeb81 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java
@@ -250,8 +250,7 @@ public class TestPerChannelBookieClient extends BookKeeperClusterTestCase {
                 return super.readEntry(ledgerId, entryId);
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, delayBookie));
+        startAndAddBookie(conf, delayBookie);
 
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
         final OrderedExecutor executor = getOrderedSafeExecutor();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
index 7147556..377e7d9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
@@ -26,13 +26,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
-
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.zookeeper.ZooKeeper;
@@ -87,14 +86,14 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
         BookieServer auditor = verifyAuditor();
 
         // shutdown bookie which is not an auditor
-        int indexOf = bs.indexOf(auditor);
+        int indexOf = indexOfServer(auditor);
         int bkIndexDownBookie;
-        if (indexOf < bs.size() - 1) {
+        if (indexOf < lastBookieIndex()) {
             bkIndexDownBookie = indexOf + 1;
         } else {
             bkIndexDownBookie = indexOf - 1;
         }
-        shutdownBookie(bs.get(bkIndexDownBookie));
+        shutdownBookie(serverByIndex(bkIndexDownBookie));
 
         startNewBookie();
         startNewBookie();
@@ -115,14 +114,11 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
         shutdownBookie(auditor);
 
         BookieServer newAuditor1 = waitForNewAuditor(auditor);
-        bs.remove(auditor);
-
         shutdownBookie(newAuditor1);
         BookieServer newAuditor2 = waitForNewAuditor(newAuditor1);
         assertNotSame(
                 "Auditor re-election is not happened for auditor failure!",
                 auditor, newAuditor2);
-        bs.remove(newAuditor1);
     }
 
     /**
@@ -160,9 +156,7 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
         assertNotSame(
                 "Auditor re-election is not happened for auditor failure!",
                 auditor, newAuditor);
-        int indexOfDownBookie = bs.indexOf(auditor);
-        bs.remove(indexOfDownBookie);
-        bsConfs.remove(indexOfDownBookie);
+
         List<String> children = zkc.getChildren(electionPath, false);
         for (String child : children) {
             byte[] data = zkc.getData(electionPath + '/' + child, false, null);
@@ -181,15 +175,11 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
     public void testRestartAuditorBookieAfterCrashing() throws Exception {
         BookieServer auditor = verifyAuditor();
 
-        shutdownBookie(auditor);
         String addr = auditor.getBookieId().toString();
 
         // restarting Bookie with same configurations.
-        int indexOfDownBookie = bs.indexOf(auditor);
-        ServerConfiguration serverConfiguration = bsConfs
-                .get(indexOfDownBookie);
-        bs.remove(indexOfDownBookie);
-        bsConfs.remove(indexOfDownBookie);
+        ServerConfiguration serverConfiguration = shutdownBookie(auditor);
+
         auditorElectors.remove(addr);
         startBookie(serverConfiguration);
         // starting corresponding auditor elector
@@ -215,9 +205,8 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
     }
 
     private void startAuditorElectors() throws Exception {
-        for (BookieServer bserver : bs) {
-            String addr = bserver.getBookieId().toString();
-            startAuditorElector(addr);
+        for (BookieId addr : bookieAddresses()) {
+            startAuditorElector(addr.toString());
         }
     }
 
@@ -240,7 +229,8 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
         List<BookieServer> auditors = new LinkedList<BookieServer>();
         byte[] data = zkc.getData(electionPath, false, null);
         assertNotNull("Auditor election failed", data);
-        for (BookieServer bks : bs) {
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieServer bks = serverByIndex(i);
             if (new String(data).contains(bks.getBookieId() + "")) {
                 auditors.add(bks);
             }
@@ -248,14 +238,17 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
         return auditors;
     }
 
-    private void shutdownBookie(BookieServer bkServer) throws Exception {
-        String addr = bkServer.getBookieId().toString();
+    private ServerConfiguration shutdownBookie(BookieServer bkServer) throws Exception {
+        int index = indexOfServer(bkServer);
+        String addr = addressByIndex(index).toString();
         LOG.debug("Shutting down bookie:" + addr);
 
         // shutdown bookie which is an auditor
-        bkServer.shutdown();
+        ServerConfiguration conf = killBookie(index);
+
         // stopping corresponding auditor elector
         auditorElectors.get(addr).shutdown();
+        return conf;
     }
 
     private BookieServer waitForNewAuditor(BookieServer auditor)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 93714c5..ed78932 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
@@ -146,8 +147,8 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
     }
 
     private void startAuditorElectors() throws Exception {
-        for (BookieServer bserver : bs) {
-            String addr = bserver.getBookieId().toString();
+        for (String addr : bookieAddresses().stream().map(Object::toString)
+                 .collect(Collectors.toList())) {
             AuditorElector auditorElector = new AuditorElector(addr, baseConf);
             auditorElectors.put(addr, auditorElector);
             auditorElector.start();
@@ -176,7 +177,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(ledgerList
                 .size());
 
-        int bkShutdownIndex = bs.size() - 1;
+        int bkShutdownIndex = lastBookieIndex();
         String shutdownBookie = shutdownBookie(bkShutdownIndex);
 
         // grace period for publishing the bk-ledger
@@ -211,12 +212,12 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
 
         LOG.debug("Created following ledgers : {}, {}", lh1, lh2);
 
-        int bkShutdownIndex = bs.size() - 1;
-        ServerConfiguration bookieConf1 = bsConfs.get(bkShutdownIndex);
+        int bkShutdownIndex = lastBookieIndex();
+        ServerConfiguration bookieConf1 = confByIndex(bkShutdownIndex);
         String shutdownBookie = shutdownBookie(bkShutdownIndex);
 
         // restart the failed bookie
-        bs.add(startBookie(bookieConf1));
+        startAndAddBookie(bookieConf1);
 
         waitForLedgerMissingReplicas(lh1.getId(), 10, shutdownBookie);
         waitForLedgerMissingReplicas(lh2.getId(), 10, shutdownBookie);
@@ -231,13 +232,13 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         LedgerHandle lh1 = createAndAddEntriesToLedger();
 
         // failing first bookie
-        shutdownBookie(bs.size() - 1);
+        shutdownBookie(lastBookieIndex());
 
         // simulate re-replication
         doLedgerRereplication(lh1.getId());
 
         // failing another bookie
-        String shutdownBookie = shutdownBookie(bs.size() - 1);
+        String shutdownBookie = shutdownBookie(lastBookieIndex());
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for ledgers to be marked as under replicated");
@@ -258,8 +259,8 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         // disabling ledger replication
         urLedgerMgr.disableLedgerReplication();
         ArrayList<String> shutdownBookieList = new ArrayList<String>();
-        shutdownBookieList.add(shutdownBookie(bs.size() - 1));
-        shutdownBookieList.add(shutdownBookie(bs.size() - 1));
+        shutdownBookieList.add(shutdownBookie(lastBookieIndex()));
+        shutdownBookieList.add(shutdownBookie(lastBookieIndex()));
 
         assertFalse("Ledger replication is not disabled!", urReplicaLatch
                 .await(1, TimeUnit.SECONDS));
@@ -304,12 +305,13 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
 
         final int bkIndex = 2;
-        ServerConfiguration bookieConf = bsConfs.get(bkIndex);
-        BookieServer bk = bs.get(bkIndex);
+        ServerConfiguration bookieConf = confByIndex(bkIndex);
+        BookieServer bk = serverByIndex(bkIndex);
         bookieConf.setReadOnlyModeEnabled(true);
 
         ((BookieImpl) bk.getBookie()).getStateManager().doTransitionToReadOnlyMode();
-        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(bkIndex))).get(30, TimeUnit.SECONDS);
+        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(confByIndex(bkIndex)))
+            .get(30, TimeUnit.SECONDS);
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for Auditor to finish ledger check.");
@@ -330,14 +332,15 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         int count = ledgerList.size();
         final CountDownLatch underReplicaLatch = registerUrLedgerWatcher(count);
 
-        int bkIndex = bs.size() - 1;
-        LOG.debug("Moving bookie {} {} to read only...", bkIndex, bs.get(bkIndex));
-        ServerConfiguration bookieConf = bsConfs.get(bkIndex);
-        BookieServer bk = bs.get(bkIndex);
+        int bkIndex = lastBookieIndex();
+        LOG.debug("Moving bookie {} {} to read only...", bkIndex, serverByIndex(bkIndex));
+        ServerConfiguration bookieConf = confByIndex(bkIndex);
+        BookieServer bk = serverByIndex(bkIndex);
         bookieConf.setReadOnlyModeEnabled(true);
 
         ((BookieImpl) bk.getBookie()).getStateManager().doTransitionToReadOnlyMode();
-        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(bkIndex))).get(30, TimeUnit.SECONDS);
+        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(confByIndex(bkIndex)))
+            .get(30, TimeUnit.SECONDS);
 
         // grace period for publishing the bk-ledger
         LOG.debug("Waiting for Auditor to finish ledger check.");
@@ -752,7 +755,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
 
         // shutdown a non auditor bookie to avoid an election
         int idx1 = getShutDownNonAuditorBookieIdx("");
-        ServerConfiguration conf1 = bsConfs.get(idx1);
+        ServerConfiguration conf1 = confByIndex(idx1);
         String shutdownBookie1 = shutdownBookie(idx1);
 
         // wait for 2 seconds and there shouldn't be any under replicated ledgers
@@ -762,7 +765,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
                 urLedgerList.size());
 
         // restart the bookie we shut down above
-        bs.add(startBookie(conf1));
+        startAndAddBookie(conf1);
 
         // Now to simulate the rolling upgrade, bring down a bookie different from
         // the one we brought down/up above.
@@ -852,7 +855,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
     }
 
     private String shutdownBookie(int bkShutdownIndex) throws Exception {
-        BookieServer bkServer = bs.get(bkShutdownIndex);
+        BookieServer bkServer = serverByIndex(bkShutdownIndex);
         String bookieAddr = bkServer.getBookieId().toString();
         LOG.debug("Shutting down bookie:" + bookieAddr);
         killBookie(bkShutdownIndex);
@@ -932,9 +935,10 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
         List<BookieServer> auditors = new LinkedList<BookieServer>();
         byte[] data = zkc.getData(electionPath, false, null);
         assertNotNull("Auditor election failed", data);
-        for (BookieServer bks : bs) {
-            if (new String(data).contains(bks.getBookieId() + "")) {
-                auditors.add(bks);
+        for (int i = 0; i < bookieCount(); i++) {
+            BookieId bookieId = addressByIndex(i);
+            if (new String(data).contains(bookieId + "")) {
+                auditors.add(serverByIndex(i));
             }
         }
         assertEquals("Multiple Bookies acting as Auditor!", 1, auditors
@@ -950,9 +954,9 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
 
     private String  shutDownNonAuditorBookie() throws Exception {
         // shutdown bookie which is not an auditor
-        int indexOf = bs.indexOf(getAuditorBookie());
+        int indexOf = indexOfServer(getAuditorBookie());
         int bkIndexDownBookie;
-        if (indexOf < bs.size() - 1) {
+        if (indexOf < lastBookieIndex()) {
             bkIndexDownBookie = indexOf + 1;
         } else {
             bkIndexDownBookie = indexOf - 1;
@@ -962,10 +966,10 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
 
     private int getShutDownNonAuditorBookieIdx(String exclude) throws Exception {
         // shutdown bookie which is not an auditor
-        int indexOf = bs.indexOf(getAuditorBookie());
+        int indexOf = indexOfServer(getAuditorBookie());
         int bkIndexDownBookie = 0;
-        for (int i = 0; i < bs.size(); i++) {
-            if (i == indexOf || bs.get(i).getBookieId().toString().equals(exclude)) {
+        for (int i = 0; i <= lastBookieIndex(); i++) {
+            if (i == indexOf || addressByIndex(i).toString().equals(exclude)) {
                 continue;
             }
             bkIndexDownBookie = i;
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index d4f3c26..9b9c1c5 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -67,7 +67,7 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
         conf.setAuditorPeriodicBookieCheckInterval(CHECK_INTERVAL);
         conf.setMetadataServiceUri(metadataServiceUri);
         conf.setProperty("clientConnectTimeoutMillis", 500);
-        String addr = bs.get(0).getBookieId().toString();
+        String addr = addressByIndex(0).toString();
 
         auditorElector = new AuditorElector(addr, conf);
         auditorElector.start();
@@ -86,8 +86,8 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testPeriodicBookieCheckInterval() throws Exception {
-        bsConfs.get(0).setMetadataServiceUri(zkUtil.getMetadataServiceUri());
-        runFunctionWithLedgerManagerFactory(bsConfs.get(0), mFactory -> {
+        confByIndex(0).setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+        runFunctionWithLedgerManagerFactory(confByIndex(0), mFactory -> {
             try (LedgerManager ledgerManager = mFactory.newLedgerManager()) {
                 @Cleanup final LedgerUnderreplicationManager underReplicationManager =
                     mFactory.newLedgerUnderreplicationManager();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index a1b6148..f47486c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -103,10 +103,10 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         super.setUp();
 
         for (int i = 0; i < numBookies; i++) {
-            ServerConfiguration conf = new ServerConfiguration(bsConfs.get(i));
+            ServerConfiguration conf = new ServerConfiguration(confByIndex(i));
             conf.setAuditorPeriodicCheckInterval(CHECK_INTERVAL);
 
-            String addr = bs.get(i).getBookieId().toString();
+            String addr = addressByIndex(i).toString();
 
             AuditorElector auditorElector = new AuditorElector(addr, conf);
             auditorElectors.put(addr, auditorElector);
@@ -115,9 +115,9 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         }
 
         driver = MetadataDrivers.getBookieDriver(
-            URI.create(bsConfs.get(0).getMetadataServiceUri()));
+            URI.create(confByIndex(0).getMetadataServiceUri()));
         driver.initialize(
-            bsConfs.get(0),
+            confByIndex(0),
             () -> {},
             NullStatsLogger.INSTANCE);
     }
@@ -152,10 +152,10 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         }
         lh.close();
 
-        BookieAccessor.forceFlush((BookieImpl) bs.get(0).getBookie());
+        BookieAccessor.forceFlush((BookieImpl) serverByIndex(0).getBookie());
 
 
-        File ledgerDir = bsConfs.get(0).getLedgerDirs()[0];
+        File ledgerDir = confByIndex(0).getLedgerDirs()[0];
         ledgerDir = BookieImpl.getCurrentDirectory(ledgerDir);
         // corrupt of entryLogs
         File[] entryLogs = ledgerDir.listFiles(new FilenameFilter() {
@@ -208,9 +208,9 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         }
         lh.close();
 
-        BookieAccessor.forceFlush((BookieImpl) bs.get(0).getBookie());
+        BookieAccessor.forceFlush((BookieImpl) serverByIndex(0).getBookie());
 
-        File ledgerDir = bsConfs.get(0).getLedgerDirs()[0];
+        File ledgerDir = confByIndex(0).getLedgerDirs()[0];
         ledgerDir = BookieImpl.getCurrentDirectory(ledgerDir);
 
         // corrupt of entryLogs
@@ -283,8 +283,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
                 throw new IOException("Fake I/O exception");
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, deadBookie));
+        startAndAddBookie(conf, deadBookie);
 
         Thread.sleep(CHECK_INTERVAL * 2000);
         assertEquals("Nothing should have tried to read", 0, numReads.get());
@@ -339,8 +338,8 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         }
 
         try (final Auditor auditor = new Auditor(
-                BookieImpl.getBookieId(bsConfs.get(0)).toString(),
-                bsConfs.get(0), NullStatsLogger.INSTANCE)) {
+                BookieImpl.getBookieId(confByIndex(0)).toString(),
+                confByIndex(0), NullStatsLogger.INSTANCE)) {
             final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
             final CountDownLatch latch = new CountDownLatch(1);
             Thread t = new Thread() {
@@ -386,7 +385,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         validateInitialDelayOfCheckAllLedgers(urm, -1, 1000, servConf, bkc);
         validateInitialDelayOfCheckAllLedgers(urm, 999, 1000, servConf, bkc);
         validateInitialDelayOfCheckAllLedgers(urm, 1001, 1000, servConf, bkc);
@@ -480,7 +479,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         validateInitialDelayOfPlacementPolicyCheck(urm, -1, 1000, servConf, bkc);
         validateInitialDelayOfPlacementPolicyCheck(urm, 999, 1000, servConf, bkc);
         validateInitialDelayOfPlacementPolicyCheck(urm, 1001, 1000, servConf, bkc);
@@ -585,7 +584,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         validateInitialDelayOfReplicasCheck(urm, -1, 1000, servConf, bkc);
         validateInitialDelayOfReplicasCheck(urm, 999, 1000, servConf, bkc);
         validateInitialDelayOfReplicasCheck(urm, 1001, 1000, servConf, bkc);
@@ -709,15 +708,15 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         // Identify a bookie in the current ledger ensemble to be replaced
         BookieId replacedBookie = null;
         for (int i = 0; i < numBookies; i++) {
-            if (curEnsemble.contains(bs.get(i).getBookieId())) {
+            if (curEnsemble.contains(addressByIndex(i))) {
                 bookieIdx = i;
-                replacedBookie = bs.get(i).getBookieId();
+                replacedBookie = addressByIndex(i);
                 break;
             }
         }
         assertNotEquals("Couldn't find ensemble bookie in bookie list", -1, bookieIdx);
 
-        LOG.info("Killing bookie " + bs.get(bookieIdx).getBookieId());
+        LOG.info("Killing bookie " + addressByIndex(bookieIdx));
         ServerConfiguration conf = killBookie(bookieIdx);
         Bookie writeFailingBookie = new BookieImpl(conf) {
             @Override
@@ -739,8 +738,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
                 }
             }
         };
-        bsConfs.add(conf);
-        bs.add(startBookie(conf, writeFailingBookie));
+        startAndAddBookie(conf, writeFailingBookie);
         return replacedBookie;
     }
 
@@ -784,7 +782,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         // now start the replication workers
         List<ReplicationWorker> l = new ArrayList<ReplicationWorker>();
         for (int i = 0; i < numBookies; i++) {
-            ReplicationWorker rw = new ReplicationWorker(bsConfs.get(i), NullStatsLogger.INSTANCE);
+            ReplicationWorker rw = new ReplicationWorker(confByIndex(i), NullStatsLogger.INSTANCE);
             rw.start();
             l.add(rw);
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
index 9e134ee..254de9a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPlacementPolicyCheckTest.java
@@ -82,8 +82,8 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
     public void setUp() throws Exception {
         super.setUp();
         StaticDNSResolver.reset();
-        driver = MetadataDrivers.getBookieDriver(URI.create(bsConfs.get(0).getMetadataServiceUri()));
-        driver.initialize(bsConfs.get(0), () -> {
+        driver = MetadataDrivers.getBookieDriver(URI.create(confByIndex(0).getMetadataServiceUri()));
+        driver.initialize(confByIndex(0), () -> {
         }, NullStatsLogger.INSTANCE);
     }
 
@@ -181,7 +181,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
                 .build();
         lm.createLedgerMetadata(4L, initMeta).get();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
         setServerConfigPropertiesForRackPlacement(servConf);
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
@@ -272,7 +272,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
                 .build();
         lm.createLedgerMetadata(2L, initMeta).get();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
         setServerConfigPropertiesForRackPlacement(servConf);
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
@@ -408,7 +408,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
             Thread.sleep(5000);
         }
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         servConf.setUnderreplicatedLedgerRecoveryGracePeriod(underreplicatedLedgerRecoveryGracePeriod);
         setServerConfigPropertiesForRackPlacement(servConf);
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
@@ -509,7 +509,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
         lm.createLedgerMetadata(2L, initMeta).get();
         numOfLedgersNotAdheringToPlacementPolicy++;
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
         setServerConfigPropertiesForRackPlacement(servConf);
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
@@ -555,7 +555,7 @@ public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
         LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
         LedgerManager lm = mFactory.newLedgerManager();
 
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         servConf.setDesiredNumZonesPerWriteQuorum(3);
         servConf.setMinNumZonesPerWriteQuorum(2);
         setServerConfigPropertiesForZonePlacement(servConf);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
index 4d3d7a3..550147b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorReplicasCheckTest.java
@@ -88,8 +88,8 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
     public void setUp() throws Exception {
         super.setUp();
         StaticDNSResolver.reset();
-        driver = MetadataDrivers.getBookieDriver(URI.create(bsConfs.get(0).getMetadataServiceUri()));
-        driver.initialize(bsConfs.get(0), () -> {
+        driver = MetadataDrivers.getBookieDriver(URI.create(confByIndex(0).getMetadataServiceUri()));
+        driver.initialize(confByIndex(0), () -> {
         }, NullStatsLogger.INSTANCE);
     }
 
@@ -221,9 +221,8 @@ public class AuditorReplicasCheckTest extends BookKeeperClusterTestCase {
             MultiKeyMap<String, Integer> errorReturnValueForGetAvailabilityOfEntriesOfLedger,
             int expectedNumLedgersFoundHavingNoReplicaOfAnEntry,
             int expectedNumLedgersHavingLessThanAQReplicasOfAnEntry,
-            int expectedNumLedgersHavingLessThanWQReplicasOfAnEntry) throws CompatibilityException,
-            UnavailableException, UnknownHostException, MetadataException, KeeperException, InterruptedException {
-        ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
+            int expectedNumLedgersHavingLessThanWQReplicasOfAnEntry) throws Exception {
+        ServerConfiguration servConf = new ServerConfiguration(confByIndex(0));
         setServerConfigProperties(servConf);
         MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
         try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index 66c4650..b1210b7 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -51,7 +51,7 @@ public class AuditorRollingRestartTest extends BookKeeperClusterTestCase {
     @Test
     public void testAuditingDuringRollingRestart() throws Exception {
         runFunctionWithLedgerManagerFactory(
-            bsConfs.get(0),
+            confByIndex(0),
             mFactory -> {
                 try {
                     testAuditingDuringRollingRestart(mFactory);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
index 8e672fd..3837606 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuthAutoRecoveryTest.java
@@ -89,7 +89,7 @@ public class AuthAutoRecoveryTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testAuthClientRole() throws Exception {
-        ServerConfiguration config = bsConfs.get(0);
+        ServerConfiguration config = confByIndex(0);
         assertEquals(AuditorClientAuthInterceptorFactory.class.getName(), config.getClientAuthProviderFactoryClass());
         AutoRecoveryMain main = new AutoRecoveryMain(config);
         try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index c222e59..2ed92b2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -45,7 +45,7 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testStartup() throws Exception {
-        AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+        AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
         try {
             main.start();
             Thread.sleep(500);
@@ -63,7 +63,7 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
      */
     @Test
     public void testShutdown() throws Exception {
-        AutoRecoveryMain main = new AutoRecoveryMain(bsConfs.get(0));
+        AutoRecoveryMain main = new AutoRecoveryMain(confByIndex(0));
         main.start();
         Thread.sleep(500);
         assertTrue("AuditorElector should be running",
@@ -87,9 +87,9 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
         /*
          * initialize three AutoRecovery instances.
          */
-        AutoRecoveryMain main1 = new AutoRecoveryMain(bsConfs.get(0));
-        AutoRecoveryMain main2 = new AutoRecoveryMain(bsConfs.get(1));
-        AutoRecoveryMain main3 = new AutoRecoveryMain(bsConfs.get(2));
+        AutoRecoveryMain main1 = new AutoRecoveryMain(confByIndex(0));
+        AutoRecoveryMain main2 = new AutoRecoveryMain(confByIndex(1));
+        AutoRecoveryMain main3 = new AutoRecoveryMain(confByIndex(2));
 
         /*
          * start main1, make sure all the components are started and main1 is
@@ -99,8 +99,8 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
         ZooKeeper zk1 = zkMetadataClientDriver1.getZk();
         Auditor auditor1 = main1.auditorElector.getAuditor();
 
-        BookieId currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(0), zk1);
-        assertTrue("Current Auditor should be AR1", currentAuditor.equals(BookieImpl.getBookieId(bsConfs.get(0))));
+        BookieId currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(0), zk1);
+        assertTrue("Current Auditor should be AR1", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(0))));
         assertTrue("Auditor of AR1 should be running", auditor1.isRunning());
 
         /*
@@ -116,7 +116,7 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
          * auditors are not running.
          */
         assertTrue("Current Auditor should still be AR1",
-                currentAuditor.equals(BookieImpl.getBookieId(bsConfs.get(0))));
+                currentAuditor.equals(BookieImpl.getBookieId(confByIndex(0))));
         Auditor auditor2 = main2.auditorElector.getAuditor();
         Auditor auditor3 = main3.auditorElector.getAuditor();
         assertTrue("AR2's Auditor should not be running", (auditor2 == null || !auditor2.isRunning()));
@@ -153,8 +153,8 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
         /*
          * the AR3 should be current auditor.
          */
-        currentAuditor = AuditorElector.getCurrentAuditor(bsConfs.get(2), zk3);
-        assertTrue("Current Auditor should be AR3", currentAuditor.equals(BookieImpl.getBookieId(bsConfs.get(2))));
+        currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(2), zk3);
+        assertTrue("Current Auditor should be AR3", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(2))));
         auditor3 = main3.auditorElector.getAuditor();
         assertTrue("Auditor of AR3 should be running", auditor3.isRunning());
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
index eb00dc1..05937fe 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieAutoRecoveryTest.java
@@ -172,8 +172,8 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         // starting the replication service, so that he will be able to act as
         // target bookie
         startNewBookie();
-        int newBookieIndex = bs.size() - 1;
-        BookieServer newBookieServer = bs.get(newBookieIndex);
+        int newBookieIndex = lastBookieIndex();
+        BookieServer newBookieServer = serverByIndex(newBookieIndex);
 
         LOG.debug("Waiting to finish the replication of failed bookie : "
                 + replicaToKillAddr);
@@ -226,8 +226,8 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         // starting the replication service, so that he will be able to act as
         // target bookie
         startNewBookie();
-        int newBookieIndex = bs.size() - 1;
-        BookieServer newBookieServer = bs.get(newBookieIndex);
+        int newBookieIndex = lastBookieIndex();
+        BookieServer newBookieServer = serverByIndex(newBookieIndex);
 
         LOG.debug("Waiting to finish the replication of failed bookie : "
                 + replicaToKillAddr);
@@ -293,8 +293,8 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         // starting the replication service, so that he will be able to act as
         // target bookie
         startNewBookie();
-        int newBookieIndex = bs.size() - 1;
-        BookieServer newBookieServer = bs.get(newBookieIndex);
+        int newBookieIndex = lastBookieIndex();
+        BookieServer newBookieServer = serverByIndex(newBookieIndex);
 
         LOG.debug("Waiting to finish the replication of failed bookie : "
                 + replicaToKillAddr);
@@ -427,12 +427,9 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         serverConf2.setUseHostNameAsBookieID(true);
         ServerConfiguration serverConf3 = newServerConfiguration();
         serverConf3.setUseHostNameAsBookieID(true);
-        bsConfs.add(serverConf1);
-        bsConfs.add(serverConf2);
-        bsConfs.add(serverConf3);
-        bs.add(startBookie(serverConf1));
-        bs.add(startBookie(serverConf2));
-        bs.add(startBookie(serverConf3));
+        startAndAddBookie(serverConf1);
+        startAndAddBookie(serverConf2);
+        startAndAddBookie(serverConf3);
 
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
@@ -470,11 +467,10 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         // target bookie
         ServerConfiguration serverConf = newServerConfiguration();
         serverConf.setUseHostNameAsBookieID(false);
-        bsConfs.add(serverConf);
-        bs.add(startBookie(serverConf));
+        startAndAddBookie(serverConf);
 
-        int newBookieIndex = bs.size() - 1;
-        BookieServer newBookieServer = bs.get(newBookieIndex);
+        int newBookieIndex = lastBookieIndex();
+        BookieServer newBookieServer = serverByIndex(newBookieIndex);
 
         LOG.debug("Waiting to finish the replication of failed bookie : "
                 + replicaToKillAddr);
@@ -505,12 +501,9 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         serverConf2.setUseHostNameAsBookieID(true);
         ServerConfiguration serverConf3 = newServerConfiguration();
         serverConf3.setUseHostNameAsBookieID(true);
-        bsConfs.add(serverConf1);
-        bsConfs.add(serverConf2);
-        bsConfs.add(serverConf3);
-        bs.add(startBookie(serverConf1));
-        bs.add(startBookie(serverConf2));
-        bs.add(startBookie(serverConf3));
+        startAndAddBookie(serverConf1);
+        startAndAddBookie(serverConf2);
+        startAndAddBookie(serverConf3);
 
         List<LedgerHandle> listOfLedgerHandle = createLedgersAndAddEntries(1, 5);
         LedgerHandle lh = listOfLedgerHandle.get(0);
@@ -550,11 +543,10 @@ public class BookieAutoRecoveryTest extends BookKeeperClusterTestCase {
         // target bookie
         ServerConfiguration serverConf = newServerConfiguration();
         serverConf.setUseHostNameAsBookieID(true);
-        bsConfs.add(serverConf);
-        bs.add(startBookie(serverConf));
+        startAndAddBookie(serverConf);
 
-        int newBookieIndex = bs.size() - 1;
-        BookieServer newBookieServer = bs.get(newBookieIndex);
+        int newBookieIndex = lastBookieIndex();
+        BookieServer newBookieServer = serverByIndex(newBookieIndex);
 
         LOG.debug("Waiting to finish the replication of failed bookie : "
                 + replicaToKillAddr);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
index 4244ff3..810f106 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/BookieLedgerIndexTest.java
@@ -20,8 +20,6 @@ package org.apache.bookkeeper.replication;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -41,7 +39,6 @@ import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
@@ -181,7 +178,7 @@ public class BookieLedgerIndexTest extends BookKeeperClusterTestCase {
             LedgerHandle lh2 = createAndAddEntriesToLedger();
 
             startNewBookie();
-            shutdownBookie(bs.size() - 2);
+            shutdownBookie(lastBookieIndex() - 1);
 
             // add few more entries after ensemble reformation
             for (int i = 0; i < 10; i++) {
@@ -221,10 +218,8 @@ public class BookieLedgerIndexTest extends BookKeeperClusterTestCase {
         }
     }
 
-    private void shutdownBookie(int bkShutdownIndex) throws IOException {
-        bs.remove(bkShutdownIndex).shutdown();
-        File f = tmpDirs.remove(bkShutdownIndex);
-        FileUtils.deleteDirectory(f);
+    private void shutdownBookie(int bkShutdownIndex) throws Exception {
+        killBookie(bkShutdownIndex);
     }
 
     private LedgerHandle createAndAddEntriesToLedger() throws BKException,
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index f68dfd6..97cad2d 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -235,8 +235,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 Thread.sleep(100);
             }
             // restart killed bookie
-            bs.add(startBookie(killedBookieConfig));
-            bsConfs.add(killedBookieConfig);
+            startAndAddBookie(killedBookieConfig);
             while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh
                     .getId(), basePath)) {
                 Thread.sleep(100);
@@ -290,8 +289,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
                 Thread.sleep(100);
             }
             // restart killed bookie
-            bs.add(startBookie(killedBookieConfig));
-            bsConfs.add(killedBookieConfig);
+            startAndAddBookie(killedBookieConfig);
             while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh
                     .getId(), basePath)) {
                 Thread.sleep(100);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
index c0f84d3..8ed3eba 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/GSSAPIBookKeeperTest.java
@@ -178,15 +178,15 @@ public class GSSAPIBookKeeperTest extends BookKeeperClusterTestCase {
     private int entryCount(long ledgerId, ClientConfiguration clientConf)
             throws Exception {
         LOG.info("Counting entries in {}", ledgerId);
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setUseHostNameAsBookieID(true);
-            conf.setBookieAuthProviderFactoryClass(
-                SASLBookieAuthProviderFactory.class.getName());
-        }
         clientConf.setClientAuthProviderFactoryClass(
             SASLClientProviderFactory.class.getName());
 
-        restartBookies();
+        restartBookies(c -> {
+                c.setUseHostNameAsBookieID(true);
+                c.setBookieAuthProviderFactoryClass(
+                        SASLBookieAuthProviderFactory.class.getName());
+                return c;
+            });
 
         try (BookKeeper bkc = new BookKeeper(clientConf, zkc);
             LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
@@ -253,10 +253,7 @@ public class GSSAPIBookKeeperTest extends BookKeeperClusterTestCase {
 
     BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
         System.setProperty(SaslConstants.SASL_SERVICE_NAME, non_default_sasl_service_name);
-        bsConfs.add(conf);
-        BookieServer s = startBookie(conf);
-        bs.add(s);
-        return s;
+        return startAndAddBookie(conf).getServer();
     }
 
     @AfterClass
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
index 9371906..14f70cd 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/sasl/MD5DigestBookKeeperTest.java
@@ -82,15 +82,15 @@ public class MD5DigestBookKeeperTest extends BookKeeperClusterTestCase {
     private int entryCount(long ledgerId, ServerConfiguration bookieConf,
         ClientConfiguration clientConf) throws Exception {
         LOG.info("Counting entries in {}", ledgerId);
-        for (ServerConfiguration conf : bsConfs) {
-            bookieConf.setBookieAuthProviderFactoryClass(
-                SASLBookieAuthProviderFactory.class.getName());
-            bookieConf.setProperty(JAAS_CLIENT_ALLOWED_IDS, ".*hd.*");
-        }
         clientConf.setClientAuthProviderFactoryClass(
             SASLClientProviderFactory.class.getName());
 
-        restartBookies();
+        restartBookies(c -> {
+                c.setBookieAuthProviderFactoryClass(
+                        SASLBookieAuthProviderFactory.class.getName());
+                c.setProperty(JAAS_CLIENT_ALLOWED_IDS, ".*hd.*");
+                return c;
+            });
 
         try (BookKeeper bkc = new BookKeeper(clientConf, zkc);
             LedgerHandle lh = bkc.openLedger(ledgerId, DigestType.CRC32,
@@ -134,10 +134,7 @@ public class MD5DigestBookKeeperTest extends BookKeeperClusterTestCase {
     }
 
     BookieServer startAndStoreBookie(ServerConfiguration conf) throws Exception {
-        bsConfs.add(conf);
-        BookieServer s = startBookie(conf);
-        bs.add(s);
-        return s;
+        return startAndAddBookie(conf).getServer();
     }
 
     @AfterClass
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index 92a575d..e3e65fa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -87,8 +87,9 @@ public class TestHttpService extends BookKeeperClusterTestCase {
         super.setUp();
         baseConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
         baseClientConf.setStoreSystemtimeAsLedgerCreationTime(true);
+
         this.bkHttpServiceProvider = new BKHttpServiceProvider.Builder()
-            .setBookieServer(bs.get(numberOfBookies - 1))
+            .setBookieServer(serverByIndex(numberOfBookies - 1))
             .setServerConfiguration(baseConf)
             .build();
     }
@@ -593,7 +594,7 @@ public class TestHttpService extends BookKeeperClusterTestCase {
 
     AuditorElector auditorElector;
     private Future<?> startAuditorElector() throws Exception {
-        String addr = bs.get(0).getBookieId().toString();
+        String addr = addressByIndex(0).toString();
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setAuditorPeriodicBookieCheckInterval(1);
         conf.setMetadataServiceUri("zk://" + zkUtil.getZooKeeperConnectString() + "/ledgers");
@@ -861,7 +862,9 @@ public class TestHttpService extends BookKeeperClusterTestCase {
         assertEquals(HttpServer.StatusCode.NOT_FOUND.getValue(), response2.getStatusCode());
 
         // Simulate bookies shutting down
-        bs.forEach(bookieServer -> bookieServer.getBookie().getStateManager().forceToShuttingDown());
+        for (int i = 0; i < bookieCount(); i++) {
+            serverByIndex(i).getBookie().getStateManager().forceToShuttingDown();
+        }
         HttpServiceRequest request3 = new HttpServiceRequest(null, HttpServer.Method.GET, null);
         HttpServiceResponse response3 = bookieStateServer.handle(request3);
         assertEquals(HttpServer.StatusCode.SERVICE_UNAVAILABLE.getValue(), response3.getStatusCode());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/service/ListLedgerServiceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/service/ListLedgerServiceTest.java
index a8436ff..c83fb6c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/service/ListLedgerServiceTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/service/ListLedgerServiceTest.java
@@ -60,7 +60,7 @@ public class ListLedgerServiceTest extends BookKeeperClusterTestCase {
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        listLedgerService = new ListLedgerService(bsConfs.get(0), bs.get(0));
+        listLedgerService = new ListLedgerService(confByIndex(0), serverByIndex(0));
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
index 565e398..cc28bb2 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java
@@ -29,17 +29,19 @@ import io.netty.buffer.ByteBufAllocator;
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.BookieImpl;
@@ -57,8 +59,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
-import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
-import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.PortManager;
 import org.apache.commons.io.FileUtils;
@@ -91,10 +91,9 @@ public abstract class BookKeeperClusterTestCase {
     protected String metadataServiceUri;
 
     // BookKeeper related variables
-    protected final List<File> tmpDirs = new LinkedList<File>();
-    protected final List<BookieServer> bs = new LinkedList<BookieServer>();
-    protected final List<ServerConfiguration> bsConfs = new LinkedList<ServerConfiguration>();
-    private final Map<BookieId, TestStatsProvider> bsLoggers = new HashMap<>();
+    protected final List<File> tmpDirs = new LinkedList<>();
+    private final List<ServerTester> servers = new LinkedList<>();
+
     protected int numBookies;
     protected BookKeeperTestClient bkc;
     protected boolean useUUIDasBookieId = true;
@@ -107,8 +106,6 @@ public abstract class BookKeeperClusterTestCase {
     protected final ServerConfiguration baseConf = TestBKConfiguration.newServerConfiguration();
     protected final ClientConfiguration baseClientConf = TestBKConfiguration.newClientConfiguration();
 
-    private final Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<>();
-
     private boolean isAutoRecoveryEnabled;
 
     SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
@@ -267,17 +264,10 @@ public abstract class BookKeeperClusterTestCase {
             bkc.close();
         }
 
-        for (BookieServer server : bs) {
-            server.shutdown();
-            AutoRecoveryMain autoRecovery = autoRecoveryProcesses.get(server);
-            if (autoRecovery != null && isAutoRecoveryEnabled()) {
-                autoRecovery.shutdown();
-                LOG.debug("Shutdown auto recovery for bookieserver:"
-                        + server.getBookieId());
-            }
+        for (ServerTester t : servers) {
+            t.shutdown();
         }
-        bs.clear();
-        bsLoggers.clear();
+        servers.clear();
     }
 
     protected void cleanupTempDirs() throws Exception {
@@ -321,23 +311,16 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     protected void stopAllBookies(boolean shutdownClient) throws Exception {
-        for (BookieServer server : bs) {
-            server.shutdown();
+        for (ServerTester t : servers) {
+            t.shutdown();
         }
-        bsConfs.clear();
-        bs.clear();
+        servers.clear();
         if (shutdownClient && bkc != null) {
             bkc.close();
             bkc = null;
         }
     }
 
-    protected void startAllBookies() throws Exception {
-        for (ServerConfiguration conf : bsConfs) {
-            bs.add(startBookie(conf));
-        }
-    }
-
     protected String newMetadataServiceUri(String ledgersRootPath) {
         return zkUtil.getMetadataServiceUri(ledgersRootPath);
     }
@@ -350,32 +333,81 @@ public abstract class BookKeeperClusterTestCase {
      * Get bookie address for bookie at index.
      */
     public BookieId getBookie(int index) throws Exception {
-        if (bs.size() <= index || index < 0) {
-            throw new IllegalArgumentException("Invalid index, there are only " + bs.size()
-                                               + " bookies. Asked for " + index);
+        return servers.get(index).getServer().getBookieId();
+    }
+
+    protected List<BookieId> bookieAddresses() throws Exception {
+        List<BookieId> bookieIds = new ArrayList<>();
+        for (ServerTester a : servers) {
+            bookieIds.add(a.getServer().getBookieId());
+        }
+        return bookieIds;
+    }
+
+    protected List<File> bookieLedgerDirs() throws Exception {
+        return servers.stream()
+            .flatMap(t -> Arrays.stream(t.getConfiguration().getLedgerDirs()))
+            .collect(Collectors.toList());
+    }
+
+    protected List<File> bookieJournalDirs() throws Exception {
+        return servers.stream()
+            .flatMap(t -> Arrays.stream(t.getConfiguration().getJournalDirs()))
+            .collect(Collectors.toList());
+    }
+
+    protected BookieId addressByIndex(int index) throws Exception {
+        return servers.get(index).getServer().getBookieId();
+    }
+
+    protected BookieServer serverByIndex(int index) throws Exception {
+        return servers.get(index).getServer();
+    }
+
+    protected ServerConfiguration confByIndex(int index) throws Exception {
+        return servers.get(index).getConfiguration();
+    }
+
+    private Optional<ServerTester> byAddress(BookieId addr) throws UnknownHostException {
+        for (ServerTester s : servers) {
+            if (s.getServer().getBookieId().equals(addr)) {
+                return Optional.of(s);
+            }
         }
-        return bs.get(index).getBookieId();
+        return Optional.empty();
     }
 
-    public BookieSocketAddress getBookieAddress(int index) throws Exception {
-        return bkc.getBookieAddressResolver().resolve(getBookie(index));
+    protected int indexOfServer(BookieServer b) throws Exception {
+        for (int i = 0; i < servers.size(); i++) {
+            if (servers.get(i).getServer().equals(b)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    protected int lastBookieIndex() {
+        return servers.size() - 1;
+    }
+
+    protected int bookieCount() {
+        return servers.size();
+    }
+
+    private OptionalInt indexByAddress(BookieId addr) throws UnknownHostException {
+        for (int i = 0; i < servers.size(); i++) {
+            if (addr.equals(servers.get(i).getServer().getBookieId())) {
+                return OptionalInt.of(i);
+            }
+        }
+        return OptionalInt.empty();
     }
 
     /**
      * Get bookie configuration for bookie.
      */
     public ServerConfiguration getBkConf(BookieId addr) throws Exception {
-        int bkIndex = 0;
-        for (BookieServer server : bs) {
-            if (server.getBookieId().equals(addr)) {
-                break;
-            }
-            ++bkIndex;
-        }
-        if (bkIndex < bs.size()) {
-            return bsConfs.get(bkIndex);
-        }
-        return null;
+        return byAddress(addr).get().getConfiguration();
     }
 
     /**
@@ -388,21 +420,11 @@ public abstract class BookKeeperClusterTestCase {
      * @throws InterruptedException
      */
     public ServerConfiguration killBookie(BookieId addr) throws Exception {
-        BookieServer toRemove = null;
-        int toRemoveIndex = 0;
-        for (BookieServer server : bs) {
-            if (server.getBookieId().equals(addr)) {
-                server.shutdown();
-                toRemove = server;
-                break;
-            }
-            ++toRemoveIndex;
-        }
-        if (toRemove != null) {
-            stopAutoRecoveryService(toRemove);
-            bs.remove(toRemove);
-            bsLoggers.remove(addr);
-            return bsConfs.remove(toRemoveIndex);
+        Optional<ServerTester> tester = byAddress(addr);
+        if (tester.isPresent()) {
+            servers.remove(tester.get());
+            tester.get().shutdown();
+            return tester.get().getConfiguration();
         }
         return null;
     }
@@ -414,12 +436,10 @@ public abstract class BookKeeperClusterTestCase {
      *          Socket Address
      * @throws InterruptedException
      */
-    public void setBookieToReadOnly(BookieId addr) throws InterruptedException, UnknownHostException {
-        for (BookieServer server : bs) {
-            if (server.getBookieId().equals(addr)) {
-                ((BookieImpl) server.getBookie()).getStateManager().doTransitionToReadOnlyMode();
-                break;
-            }
+    public void setBookieToReadOnly(BookieId addr) throws Exception {
+        Optional<ServerTester> tester = byAddress(addr);
+        if (tester.isPresent()) {
+            tester.get().getServer().getBookie().getStateManager().transitionToReadOnlyMode().get();
         }
     }
 
@@ -434,15 +454,9 @@ public abstract class BookKeeperClusterTestCase {
      * @throws IOException
      */
     public ServerConfiguration killBookie(int index) throws Exception {
-        if (index >= bs.size()) {
-            throw new IOException("Bookie does not exist");
-        }
-        BookieServer server = bs.get(index);
-        server.shutdown();
-        stopAutoRecoveryService(server);
-        bs.remove(server);
-        bsLoggers.remove(server.getBookieId());
-        return bsConfs.remove(index);
+        ServerTester tester = servers.remove(index);
+        tester.shutdown();
+        return tester.getConfiguration();
     }
 
     /**
@@ -453,13 +467,10 @@ public abstract class BookKeeperClusterTestCase {
      * @return configuration of killed bookie
      */
     public ServerConfiguration killBookieAndWaitForZK(int index) throws Exception {
-        if (index >= bs.size()) {
-            throw new IOException("Bookie does not exist");
-        }
-        BookieServer server = bs.get(index);
+        ServerTester tester = servers.get(index); // IKTODO: this method is awful
         ServerConfiguration ret = killBookie(index);
         while (zkc.exists(ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/" + AVAILABLE_NODE + "/"
-                + server.getBookieId().toString(), false) != null) {
+                       + tester.getServer().getBookieId().toString(), false) != null) {
             Thread.sleep(500);
         }
         return ret;
@@ -478,29 +489,29 @@ public abstract class BookKeeperClusterTestCase {
      */
     public CountDownLatch sleepBookie(BookieId addr, final int seconds)
             throws Exception {
-        for (final BookieServer bookie : bs) {
-            if (bookie.getBookieId().equals(addr)) {
-                final CountDownLatch l = new CountDownLatch(1);
-                Thread sleeper = new Thread() {
-                        @Override
-                        public void run() {
-                            try {
-                                bookie.suspendProcessing();
-                                LOG.info("bookie {} is asleep", bookie.getBookieId());
-                                l.countDown();
-                                Thread.sleep(seconds * 1000);
-                                bookie.resumeProcessing();
-                                LOG.info("bookie {} is awake", bookie.getBookieId());
-                            } catch (Exception e) {
-                                LOG.error("Error suspending bookie", e);
-                            }
+        Optional<ServerTester> tester = byAddress(addr);
+        if (tester.isPresent()) {
+            CountDownLatch latch = new CountDownLatch(1);
+            Thread sleeper = new Thread() {
+                    @Override
+                    public void run() {
+                        try {
+                            tester.get().getServer().suspendProcessing();
+                            LOG.info("bookie {} is asleep", tester.get().getAddress());
+                            latch.countDown();
+                            Thread.sleep(seconds * 1000);
+                            tester.get().getServer().resumeProcessing();
+                            LOG.info("bookie {} is awake", tester.get().getAddress());
+                        } catch (Exception e) {
+                            LOG.error("Error suspending bookie", e);
                         }
-                    };
-                sleeper.start();
-                return l;
-            }
+                    }
+                };
+            sleeper.start();
+            return latch;
+        } else {
+            throw new IOException("Bookie not found");
         }
-        throw new IOException("Bookie not found");
     }
 
     /**
@@ -522,10 +533,11 @@ public abstract class BookKeeperClusterTestCase {
 
     public void sleepBookie(BookieId addr, final CountDownLatch l, final CountDownLatch suspendLatch)
             throws InterruptedException, IOException {
-        for (final BookieServer bookie : bs) {
-            if (bookie.getBookieId().equals(addr)) {
-                LOG.info("Sleep bookie {}.", addr);
-                Thread sleeper = new Thread() {
+        Optional<ServerTester> tester = byAddress(addr);
+        if (tester.isPresent()) {
+            BookieServer bookie = tester.get().getServer();
+            LOG.info("Sleep bookie {}.", addr);
+            Thread sleeper = new Thread() {
                     @Override
                     public void run() {
                         try {
@@ -540,11 +552,10 @@ public abstract class BookKeeperClusterTestCase {
                         }
                     }
                 };
-                sleeper.start();
-                return;
-            }
+            sleeper.start();
+        } else {
+            throw new IOException("Bookie not found");
         }
-        throw new IOException("Bookie not found");
     }
 
     /**
@@ -558,7 +569,7 @@ public abstract class BookKeeperClusterTestCase {
      */
     public void restartBookies()
             throws Exception {
-        restartBookies(null);
+        restartBookies(c -> c);
     }
 
     /**
@@ -572,61 +583,30 @@ public abstract class BookKeeperClusterTestCase {
      * @throws BookieException
      */
     public void restartBookie(BookieId addr) throws Exception {
-        BookieServer toRemove = null;
-        int toRemoveIndex = 0;
-        for (BookieServer server : bs) {
-            if (server.getBookieId().equals(addr)) {
-                toRemove = server;
-                break;
-            }
-            ++toRemoveIndex;
-        }
-        if (toRemove != null) {
-            ServerConfiguration newConfig = bsConfs.get(toRemoveIndex);
-            killBookie(toRemoveIndex);
+        OptionalInt toRemove = indexByAddress(addr);
+        if (toRemove.isPresent()) {
+            ServerConfiguration newConfig = killBookie(toRemove.getAsInt());
             Thread.sleep(1000);
-            bs.add(startBookie(newConfig));
-            bsConfs.add(newConfig);
-            return;
+            startAndAddBookie(newConfig);
+        } else {
+            throw new IOException("Bookie not found");
         }
-        throw new IOException("Bookie not found");
     }
 
-    /**
-     * Restart bookie servers using new configuration settings. Also restart the
-     * respective auto recovery process, if isAutoRecoveryEnabled is true.
-     *
-     * @param newConf
-     *            New Configuration Settings
-     * @throws InterruptedException
-     * @throws IOException
-     * @throws KeeperException
-     * @throws BookieException
-     */
-    public void restartBookies(ServerConfiguration newConf)
+    public void restartBookies(Function<ServerConfiguration, ServerConfiguration> reconfFunction)
             throws Exception {
         // shut down bookie server
-        for (BookieServer server : bs) {
+        List<ServerConfiguration> confs = new ArrayList<>();
+        for (ServerTester server : servers) {
             server.shutdown();
-            stopAutoRecoveryService(server);
+            confs.add(server.getConfiguration());
         }
-        bs.clear();
-        bsLoggers.clear();
+        servers.clear();
         Thread.sleep(1000);
         // restart them to ensure we can't
-        for (ServerConfiguration conf : bsConfs) {
-            String bookieId = conf.getBookieId();
-            // ensure the bookie id or port is loaded correctly
-            int port = conf.getBookiePort();
-            if (null != newConf) {
-                conf.loadConf(newConf);
-            }
-            if (bookieId != null) {
-                conf.setBookieId(bookieId);
-            } else {
-                conf.setBookiePort(port);
-            }
-            bs.add(startBookie(conf));
+        for (ServerConfiguration conf : confs) {
+            // ensure the bookie port is loaded correctly
+            startAndAddBookie(reconfFunction.apply(conf));
         }
     }
 
@@ -639,31 +619,34 @@ public abstract class BookKeeperClusterTestCase {
      */
     public int startNewBookie()
             throws Exception {
-        ServerConfiguration conf = newServerConfiguration();
-
-        // use a random BookieId
-        if (useUUIDasBookieId) {
-            conf.setBookieId(UUID.randomUUID().toString());
-        }
+        return startNewBookieAndReturnAddress().getPort();
+    }
 
-        bsConfs.add(conf);
+    public BookieSocketAddress startNewBookieAndReturnAddress()
+            throws Exception {
+        ServerConfiguration conf = newServerConfiguration();
         LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
-        BookieServer server = startBookie(conf);
-        bs.add(server);
-        return server.getLocalAddress().getPort();
+        return startAndAddBookie(conf).getServer().getLocalAddress();
     }
 
     public BookieId startNewBookieAndReturnBookieId()
             throws Exception {
         ServerConfiguration conf = newServerConfiguration();
-        bsConfs.add(conf);
         LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
-        BookieServer server = startBookie(conf);
-        bs.add(server);
+        return startAndAddBookie(conf).getServer().getBookieId();
+    }
 
-        return server.getBookieId();
+    protected ServerTester startAndAddBookie(ServerConfiguration conf) throws Exception {
+        ServerTester server = startBookie(conf);
+        servers.add(server);
+        return server;
     }
 
+    protected ServerTester startAndAddBookie(ServerConfiguration conf, Bookie b) throws Exception {
+        ServerTester server = startBookie(conf, b);
+        servers.add(server);
+        return server;
+    }
     /**
      * Helper method to startup a bookie server using a configuration object.
      * Also, starts the auto recovery process if isAutoRecoveryEnabled is true.
@@ -672,74 +655,54 @@ public abstract class BookKeeperClusterTestCase {
      *            Server Configuration Object
      *
      */
-    protected BookieServer startBookie(ServerConfiguration conf)
+    protected ServerTester startBookie(ServerConfiguration conf)
             throws Exception {
-        TestStatsProvider provider = new TestStatsProvider();
-
-        BookieServer server = new BookieServer(conf, provider.getStatsLogger(""), null);
-        BookieId address = BookieImpl.getBookieId(conf);
-        bsLoggers.put(address, provider);
+        ServerTester tester = new ServerTester(conf);
 
         if (bkc == null) {
             bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
 
+        BookieId address = tester.getServer().getBookieId();
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
             ? bkc.waitForReadOnlyBookie(address)
             : bkc.waitForWritableBookie(address);
 
-        server.start();
+        tester.getServer().start();
 
         waitForBookie.get(30, TimeUnit.SECONDS);
         LOG.info("New bookie '{}' has been created.", address);
 
-        try {
-            startAutoRecovery(server, conf);
-        } catch (CompatibilityException ce) {
-            LOG.error("Exception while starting AutoRecovery!", ce);
-        } catch (UnavailableException ue) {
-            LOG.error("Exception while starting AutoRecovery!", ue);
+        if (isAutoRecoveryEnabled()) {
+            tester.startAutoRecovery();
         }
-        return server;
+        return tester;
     }
 
     /**
      * Start a bookie with the given bookie instance. Also, starts the auto
      * recovery for this bookie, if isAutoRecoveryEnabled is true.
      */
-    protected BookieServer startBookie(ServerConfiguration conf, final Bookie b)
+    protected ServerTester startBookie(ServerConfiguration conf, final Bookie b)
             throws Exception {
-        TestStatsProvider provider = new TestStatsProvider();
-        BookieServer server = new BookieServer(conf, provider.getStatsLogger(""), null) {
-            @Override
-            protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator,
-                                       Supplier<BookieServiceInfo> s) {
-                return b;
-            }
-        };
-
-        BookieId address = BookieImpl.getBookieId(conf);
+        ServerTester tester = new ServerTester(conf, b);
         if (bkc == null) {
             bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
         }
+        BookieId address = tester.getServer().getBookieId();
         Future<?> waitForBookie = conf.isForceReadOnlyBookie()
             ? bkc.waitForReadOnlyBookie(address)
             : bkc.waitForWritableBookie(address);
 
-        server.start();
-        bsLoggers.put(server.getBookieId(), provider);
+        tester.getServer().start();
 
         waitForBookie.get(30, TimeUnit.SECONDS);
         LOG.info("New bookie '{}' has been created.", address);
 
-        try {
-            startAutoRecovery(server, conf);
-        } catch (CompatibilityException ce) {
-            LOG.error("Exception while starting AutoRecovery!", ce);
-        } catch (UnavailableException ue) {
-            LOG.error("Exception while starting AutoRecovery!", ue);
+        if (isAutoRecoveryEnabled()) {
+            tester.startAutoRecovery();
         }
-        return server;
+        return tester;
     }
 
     public void setMetastoreImplClass(AbstractConfiguration conf) {
@@ -770,36 +733,14 @@ public abstract class BookKeeperClusterTestCase {
         return isAutoRecoveryEnabled;
     }
 
-    private void startAutoRecovery(BookieServer bserver,
-                                   ServerConfiguration conf) throws Exception {
-        if (isAutoRecoveryEnabled()) {
-            AutoRecoveryMain autoRecoveryProcess = new AutoRecoveryMain(conf);
-            autoRecoveryProcess.start();
-            autoRecoveryProcesses.put(bserver, autoRecoveryProcess);
-            LOG.debug("Starting Auditor Recovery for the bookie:"
-                    + bserver.getBookieId());
-        }
-    }
-
-    private void stopAutoRecoveryService(BookieServer toRemove) throws Exception {
-        AutoRecoveryMain autoRecoveryMain = autoRecoveryProcesses
-                .remove(toRemove);
-        if (null != autoRecoveryMain && isAutoRecoveryEnabled()) {
-            autoRecoveryMain.shutdown();
-            LOG.debug("Shutdown auto recovery for bookieserver:"
-                    + toRemove.getBookieId());
-        }
-    }
-
     /**
      * Will starts the auto recovery process for the bookie servers. One auto
      * recovery process per each bookie server, if isAutoRecoveryEnabled is
      * enabled.
      */
     public void startReplicationService() throws Exception {
-        int index = -1;
-        for (BookieServer bserver : bs) {
-            startAutoRecovery(bserver, bsConfs.get(++index));
+        for (ServerTester t : servers) {
+            t.startAutoRecovery();
         }
     }
 
@@ -808,22 +749,16 @@ public abstract class BookKeeperClusterTestCase {
      * isAutoRecoveryEnabled is true.
      */
     public void stopReplicationService() throws Exception{
-        if (!isAutoRecoveryEnabled()){
-            return;
-        }
-        for (Entry<BookieServer, AutoRecoveryMain> autoRecoveryProcess : autoRecoveryProcesses
-                .entrySet()) {
-            autoRecoveryProcess.getValue().shutdown();
-            LOG.debug("Shutdown Auditor Recovery for the bookie:"
-                    + autoRecoveryProcess.getKey().getBookieId());
+        for (ServerTester t : servers) {
+            t.stopAutoRecovery();
         }
     }
 
     public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception {
         final long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
         while (System.nanoTime() < timeoutAt) {
-            for (AutoRecoveryMain p : autoRecoveryProcesses.values()) {
-                Auditor a = p.getAuditor();
+            for (ServerTester t : servers) {
+                Auditor a = t.getAuditor();
                 if (a != null) {
                     return a;
                 }
@@ -849,17 +784,99 @@ public abstract class BookKeeperClusterTestCase {
     }
 
     public void resetBookieOpLoggers() {
-        for (TestStatsProvider provider : bsLoggers.values()) {
-            provider.clear();
-        }
+        servers.forEach(t -> t.getStatsProvider().clear());
     }
 
-    public TestStatsProvider getStatsProvider(BookieId addr) {
-        return bsLoggers.get(addr);
+    public TestStatsProvider getStatsProvider(BookieId addr) throws UnknownHostException {
+        return byAddress(addr).get().getStatsProvider();
     }
 
     public TestStatsProvider getStatsProvider(int index) throws Exception {
-        return getStatsProvider(bs.get(index).getBookieId());
+        return servers.get(index).getStatsProvider();
     }
 
+    /**
+     * Class to encapsulate all the test objects.
+     */
+    public static class ServerTester {
+        private final ServerConfiguration conf;
+        private final TestStatsProvider provider;
+        private final BookieServer server;
+        private final BookieSocketAddress address;
+        private AutoRecoveryMain autoRecovery;
+
+        ServerTester(ServerConfiguration conf) throws Exception {
+            this.conf = conf;
+            provider = new TestStatsProvider();
+
+            server = new BookieServer(conf, provider.getStatsLogger(""), null);
+            address = BookieImpl.getBookieAddress(conf);
+
+            autoRecovery = null;
+        }
+
+        ServerTester(ServerConfiguration conf, Bookie b) throws Exception {
+            this.conf = conf;
+            provider = new TestStatsProvider();
+
+            server = new BookieServer(conf, provider.getStatsLogger(""), null) {
+                    @Override
+                    protected Bookie newBookie(ServerConfiguration conf,
+                                               ByteBufAllocator allocator,
+                                               Supplier<BookieServiceInfo> bookieServiceInfoProvider) {
+                        return b;
+                    }
+                };
+
+            address = BookieImpl.getBookieAddress(conf);
+
+            autoRecovery = null;
+        }
+
+        void startAutoRecovery() throws Exception {
+            LOG.debug("Starting Auditor Recovery for the bookie: {}", address);
+            autoRecovery = new AutoRecoveryMain(conf);
+            autoRecovery.start();
+        }
+
+        void stopAutoRecovery() {
+            if (autoRecovery != null) {
+                LOG.debug("Shutdown Auditor Recovery for the bookie: {}", address);
+                autoRecovery.shutdown();
+            }
+        }
+
+        Auditor getAuditor() {
+            if (autoRecovery != null) {
+                return autoRecovery.getAuditor();
+            } else {
+                return null;
+            }
+        }
+
+        ServerConfiguration getConfiguration() {
+            return conf;
+        }
+
+        public BookieServer getServer() {
+            return server;
+        }
+
+        TestStatsProvider getStatsProvider() {
+            return provider;
+        }
+
+        BookieSocketAddress getAddress() {
+            return address;
+        }
+
+        void shutdown() throws Exception {
+            server.shutdown();
+
+            if (autoRecovery != null) {
+                LOG.debug("Shutdown auto recovery for bookieserver: {}", address);
+                autoRecovery.shutdown();
+            }
+        }
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
index 944203c..81376fb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java
@@ -89,6 +89,7 @@ public class BookieClientTest {
             .setJournalDirName(tmpDir.getPath())
             .setLedgerDirNames(new String[] { tmpDir.getPath() })
             .setMetadataServiceUri(null);
+
         bs = new BookieServer(conf);
         bs.start();
         eventLoopGroup = new NioEventLoopGroup();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
index 7b1f6e0..e7da5cc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieFailureTest.java
@@ -99,35 +99,36 @@ public class BookieFailureTest extends BookKeeperClusterTestCase
      * @throws IOException
      */
     @Test
-    public void testAsyncBK1() throws IOException {
+    public void testAsyncBK1() throws Exception {
         LOG.info("#### BK1 ####");
-        auxTestReadWriteAsyncSingleClient(bs.get(0));
+        auxTestReadWriteAsyncSingleClient(serverByIndex(0));
     }
 
     @Test
-    public void testAsyncBK2() throws IOException {
+    public void testAsyncBK2() throws Exception {
         LOG.info("#### BK2 ####");
-        auxTestReadWriteAsyncSingleClient(bs.get(1));
+        auxTestReadWriteAsyncSingleClient(serverByIndex(1));
     }
 
     @Test
-    public void testAsyncBK3() throws IOException {
+    public void testAsyncBK3() throws Exception {
         LOG.info("#### BK3 ####");
-        auxTestReadWriteAsyncSingleClient(bs.get(2));
+        auxTestReadWriteAsyncSingleClient(serverByIndex(2));
     }
 
     @Test
-    public void testAsyncBK4() throws IOException {
+    public void testAsyncBK4() throws Exception {
         LOG.info("#### BK4 ####");
-        auxTestReadWriteAsyncSingleClient(bs.get(3));
+        auxTestReadWriteAsyncSingleClient(serverByIndex(3));
     }
 
     @Test
     public void testBookieRecovery() throws Exception {
-        //Shutdown all but 1 bookie
-        bs.get(0).shutdown();
-        bs.get(1).shutdown();
-        bs.get(2).shutdown();
+        //Shutdown all but 1 bookie (should be in it's own test case with 1 bookie)
+        assertEquals(4, bookieCount());
+        killBookie(0);
+        killBookie(0);
+        killBookie(0);
 
         byte[] passwd = "blah".getBytes();
         LedgerHandle lh = bkc.createLedger(1, 1, digestType, passwd);
@@ -138,10 +139,8 @@ public class BookieFailureTest extends BookKeeperClusterTestCase
             lh.addEntry(data);
         }
 
-        bs.get(3).shutdown();
-        BookieServer server = new BookieServer(bsConfs.get(3));
-        server.start();
-        bs.set(3, server);
+        assertEquals(1, bookieCount());
+        restartBookies();
 
         assertEquals(numEntries - 1 , lh.getLastAddConfirmed());
         Enumeration<LedgerEntry> entries = lh.readEntries(0, lh.getLastAddConfirmed());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
index 83d7ba1..ae2ef04 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieJournalRollingTest.java
@@ -23,19 +23,15 @@ package org.apache.bookkeeper.test;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.util.Enumeration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -190,7 +186,7 @@ public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
         Thread.sleep(2000);
 
         // verify that we only keep at most journal files
-        for (File journalDir : tmpDirs) {
+        for (File journalDir : bookieJournalDirs()) {
             File[] journals = journalDir.listFiles();
             int numJournals = 0;
             for (File f : journals) {
@@ -221,11 +217,12 @@ public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
         }
 
         // set flush interval to a large value
-        ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
-        newConf.setFlushInterval(999999999);
-        newConf.setAllowEphemeralPorts(false);
         // restart bookies
-        restartBookies(newConf);
+        restartBookies(c -> {
+                c.setFlushInterval(999999999);
+                c.setAllowEphemeralPorts(false);
+                return c;
+            });
 
         // Write enough ledger entries so that we roll over journals
         LedgerHandle[] lhs = writeLedgerEntries(4, 1024, 1024);
@@ -238,7 +235,7 @@ public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
         // ledger indexes are not flushed
         // and after bookies restarted, journals will be relayed
         // ensure that we can still read the entries
-        restartBookies(newConf);
+        restartBookies();
         validLedgerEntries(ledgerIds, 1024, 1024);
     }
 
@@ -260,11 +257,12 @@ public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
         Thread.sleep(3 * baseConf.getFlushInterval());
 
         // restart bookies with flush interval set to a large value
-        ServerConfiguration newConf = TestBKConfiguration.newServerConfiguration();
-        newConf.setFlushInterval(999999999);
-        newConf.setAllowEphemeralPorts(false);
         // restart bookies
-        restartBookies(newConf);
+        restartBookies(c -> {
+                c.setFlushInterval(999999999);
+                c.setAllowEphemeralPorts(false);
+                return c;
+            });
 
         // Write entries again to let them existed in journal
         writeLedgerEntries(lhs, 1024, 10);
@@ -274,10 +272,10 @@ public class BookieJournalRollingTest extends BookKeeperClusterTestCase {
             bkc.deleteLedger(lh.getId());
         }
         // wait for gc
-        Thread.sleep(2 * newConf.getGcWaitTime());
+        Thread.sleep(2 * confByIndex(0).getGcWaitTime());
 
         // restart bookies again to trigger replaying journal
-        restartBookies(newConf);
+        restartBookies();
     }
 
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
index 614d3bc..1aacd8a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ForceReadOnlyBookieTest.java
@@ -65,17 +65,17 @@ public class ForceReadOnlyBookieTest extends BookKeeperClusterTestCase {
         LOG.info("successed prepare");
 
         // start bookie 1 as readonly
-        bsConfs.get(1).setReadOnlyModeEnabled(true);
-        bsConfs.get(1).setForceReadOnlyBookie(true);
+        confByIndex(1).setReadOnlyModeEnabled(true);
+        confByIndex(1).setForceReadOnlyBookie(true);
         restartBookies();
-        Bookie bookie = bs.get(1).getBookie();
+        Bookie bookie = serverByIndex(1).getBookie();
 
         assertTrue("Bookie should be running and in readonly mode",
                 bookie.isRunning() && bookie.isReadOnly());
         LOG.info("successed force start ReadOnlyBookie");
 
         // Check new bookie with readonly mode enabled.
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1, ledgerDirs.length);
 
         // kill the writable bookie
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
index 90c48bd..68ab21b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LedgerDeleteTest.java
@@ -130,7 +130,7 @@ public class LedgerDeleteTest extends BookKeeperClusterTestCase {
         Thread.sleep(2000);
 
         // Verify that the first entry log (0.log) has been deleted from all of the Bookie Servers.
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found the entry log file (0.log) that should have been deleted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0));
         }
@@ -169,7 +169,7 @@ public class LedgerDeleteTest extends BookKeeperClusterTestCase {
          * test, a new entry log is created. We know then that the first two
          * entry logs should be deleted.
          */
-        for (File ledgerDirectory : tmpDirs) {
+        for (File ledgerDirectory : bookieLedgerDirs()) {
             assertFalse("Found the entry log file ([0,1].log) that should have been deleted in ledgerDirectory: "
                 + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, true, 0, 1));
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LocalBookiesRegistryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LocalBookiesRegistryTest.java
index 661ef21..c81a093 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LocalBookiesRegistryTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LocalBookiesRegistryTest.java
@@ -23,8 +23,6 @@ package org.apache.bookkeeper.test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-
-import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.proto.LocalBookiesRegistry;
 import org.junit.Test;
 
@@ -41,9 +39,7 @@ public class LocalBookiesRegistryTest extends BookKeeperClusterTestCase {
 
     @Test
     public void testAccessibleLocalBookiesRegistry() throws Exception {
-        assertEquals(1, bs.size());
-        for (BookieServer bk : bs) {
-            assertTrue(LocalBookiesRegistry.isLocalBookie(bk.getBookieId()));
-        }
+        assertEquals(1, bookieCount());
+        bookieAddresses().forEach(a -> assertTrue(LocalBookiesRegistry.isLocalBookie(a)));
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
index 1a9d7c1..705098a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ReadOnlyBookieTest.java
@@ -63,10 +63,10 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
                 "".getBytes());
 
         // Check new bookie with readonly mode enabled.
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1,
                 ledgerDirs.length);
-        Bookie bookie = bs.get(1).getBookie();
+        Bookie bookie = serverByIndex(1).getBookie();
         LedgerDirsManager ledgerDirsManager = ((BookieImpl) bookie).getLedgerDirsManager();
 
         for (int i = 0; i < 10; i++) {
@@ -106,10 +106,10 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
                 "".getBytes());
 
         // Check new bookie with readonly mode enabled.
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1,
                 ledgerDirs.length);
-        BookieImpl bookie = (BookieImpl) bs.get(1).getBookie();
+        BookieImpl bookie = (BookieImpl) serverByIndex(1).getBookie();
         LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
 
         for (int i = 0; i < 10; i++) {
@@ -128,7 +128,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
             // Expected
         }
 
-        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(1)))
+        bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(confByIndex(1)))
             .get(30, TimeUnit.SECONDS);
 
         LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
@@ -146,7 +146,7 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
         // Now add the current ledger dir back to writable dirs list
         ledgerDirsManager.addToWritableDirs(testDir, true);
 
-        bkc.waitForWritableBookie(BookieImpl.getBookieId(bsConfs.get(1)))
+        bkc.waitForWritableBookie(BookieImpl.getBookieId(confByIndex(1)))
             .get(30, TimeUnit.SECONDS);
 
         LOG.info("bookie is running {}, readonly {}.", bookie.isRunning(), bookie.isReadOnly());
@@ -173,10 +173,10 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
         baseConf.setReadOnlyModeEnabled(false);
         startNewBookie();
 
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1,
                 ledgerDirs.length);
-        BookieImpl bookie = (BookieImpl) bs.get(1).getBookie();
+        BookieImpl bookie = (BookieImpl) serverByIndex(1).getBookie();
         LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
                 "".getBytes());
         LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
@@ -211,10 +211,10 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
             throws Exception {
         startNewBookieWithMultipleLedgerDirs(2);
 
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 2,
                 ledgerDirs.length);
-        BookieImpl bookie = (BookieImpl) bs.get(1).getBookie();
+        BookieImpl bookie = (BookieImpl) serverByIndex(1).getBookie();
         LedgerHandle ledger = bkc.createLedger(2, 2, DigestType.MAC,
                 "".getBytes());
         LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
@@ -236,21 +236,19 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
 
     private void startNewBookieWithMultipleLedgerDirs(int numOfLedgerDirs)
             throws Exception {
-        ServerConfiguration conf = bsConfs.get(1);
+        ServerConfiguration conf = confByIndex(1);
         killBookie(1);
 
         File[] ledgerDirs = new File[numOfLedgerDirs];
         for (int i = 0; i < numOfLedgerDirs; i++) {
             File dir = createTempDir("bookie", "test");
-            tmpDirs.add(dir);
             ledgerDirs[i] = dir;
         }
 
         ServerConfiguration newConf = newServerConfiguration(
                 PortManager.nextFreePort(),
                 ledgerDirs[0], ledgerDirs);
-        bsConfs.add(newConf);
-        bs.add(startBookie(newConf));
+        startAndAddBookie(newConf);
     }
 
     /**
@@ -261,9 +259,10 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
         killBookie(1);
         baseConf.setReadOnlyModeEnabled(true);
         startNewBookie();
-        bs.get(1).getBookie().getStateManager().transitionToReadOnlyMode().get();
+
+        serverByIndex(1).getBookie().getStateManager().transitionToReadOnlyMode().get();
         try {
-            bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(bsConfs.get(1)))
+            bkc.waitForReadOnlyBookie(BookieImpl.getBookieId(confByIndex(1)))
                 .get(30, TimeUnit.SECONDS);
 
             bkc.createLedger(2, 2, DigestType.CRC32, "".getBytes());
@@ -282,14 +281,14 @@ public class ReadOnlyBookieTest extends BookKeeperClusterTestCase {
             ledger.addEntry("data".getBytes());
         }
         ledger.close();
-        bsConfs.get(1).setReadOnlyModeEnabled(true);
-        bsConfs.get(1).setDiskCheckInterval(500);
+        confByIndex(1).setReadOnlyModeEnabled(true);
+        confByIndex(1).setDiskCheckInterval(500);
         restartBookies();
 
         // Check new bookie with readonly mode enabled.
-        File[] ledgerDirs = bsConfs.get(1).getLedgerDirs();
+        File[] ledgerDirs = confByIndex(1).getLedgerDirs();
         assertEquals("Only one ledger dir should be present", 1, ledgerDirs.length);
-        BookieImpl bookie = (BookieImpl) bs.get(1).getBookie();
+        BookieImpl bookie = (BookieImpl) serverByIndex(1).getBookie();
         LedgerDirsManager ledgerDirsManager = bookie.getLedgerDirsManager();
 
         // Now add the current ledger dir to filled dirs list
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index d235073..f4b59d8 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.security.cert.Certificate;
@@ -38,7 +37,6 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.auth.AuthCallbacks;
 import org.apache.bookkeeper.auth.AuthToken;
 import org.apache.bookkeeper.auth.BookieAuthProvider;
@@ -64,6 +62,7 @@ import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.TestStatsProvider;
 import org.apache.bookkeeper.tls.TLSContextFactory.KeyStoreType;
 import org.apache.bookkeeper.util.IOUtils;
+import org.apache.bookkeeper.util.TestUtils;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -242,7 +241,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
         ServerConfiguration bookieConf = newServerConfiguration().setTLSKeyStore(null);
 
         try {
-            bs.add(startBookie(bookieConf));
+            startAndAddBookie(bookieConf);
             fail("Shouldn't have been able to start");
         } catch (SecurityException se) {
             assertTrue(true);
@@ -261,13 +260,12 @@ public class TestTLS extends BookKeeperClusterTestCase {
 
         // restart a bookie with bad cert
         int restartBookieIdx = 0;
-        ServerConfiguration bookieConf = bsConfs.get(restartBookieIdx)
+        ServerConfiguration bookieConf = confByIndex(restartBookieIdx)
                 .setTLSCertificatePath(getResourcePath("client-cert.pem"));
         killBookie(restartBookieIdx);
         LOG.info("Sleeping for 1s before restarting bookie with bad cert");
         Thread.sleep(1000);
-        bs.add(startBookie(bookieConf));
-        bsConfs.add(bookieConf);
+        startAndAddBookie(bookieConf);
 
         // Create ledger and write entries
         BookKeeper client = new BookKeeper(clientConf);
@@ -292,7 +290,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
     public void testStartTLSServerBadPassword() throws Exception {
         ServerConfiguration bookieConf = newServerConfiguration().setTLSKeyStorePasswordPath("badpassword");
         try {
-            bs.add(startBookie(bookieConf));
+            startAndAddBookie(bookieConf);
             fail("Shouldn't have been able to start");
         } catch (SecurityException se) {
             assertTrue(true);
@@ -345,12 +343,12 @@ public class TestTLS extends BookKeeperClusterTestCase {
         if (useV2Protocol) {
             return;
         }
-        ServerConfiguration serverConf = new ServerConfiguration();
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setDisableServerSocketBind(true);
-            conf.setEnableLocalTransport(true);
-        }
-        restartBookies(serverConf);
+
+        restartBookies(c -> {
+                c.setDisableServerSocketBind(true);
+                c.setEnableLocalTransport(true);
+                return c;
+            });
 
         ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
         testClient(clientConf, numBookies);
@@ -362,8 +360,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
     @Test
     public void testRefreshDurationForBookieCerts() throws Exception {
         assumeTrue(serverKeyStoreFormat == KeyStoreType.PEM);
-        ServerConfiguration serverConf = new ServerConfiguration();
-        String originalTlsKeyFilePath = bsConfs.get(0).getTLSKeyStore();
+        String originalTlsKeyFilePath = confByIndex(0).getTLSKeyStore();
         String invalidServerKey = getResourcePath("client-key.pem");
         File originalTlsCertFile = new File(originalTlsKeyFilePath);
         File newTlsKeyFile = IOUtils.createTempFileAndDeleteOnExit(originalTlsKeyFilePath, "refresh");
@@ -373,11 +370,11 @@ public class TestTLS extends BookKeeperClusterTestCase {
         // copy invalid cert to new temp file
         FileUtils.copyFile(invalidServerKeyFile, newTlsKeyFile);
         long refreshDurationInSec = 1;
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setTLSCertFilesRefreshDurationSeconds(1);
-            conf.setTLSKeyStore(newTlsKeyFile.getAbsolutePath());
-        }
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSCertFilesRefreshDurationSeconds(1);
+                c.setTLSKeyStore(newTlsKeyFile.getAbsolutePath());
+                return c;
+            });
 
         ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
         try {
@@ -477,9 +474,10 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testConnectToTLSClusterTLSClientWithTLSNoAuthentication() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setTLSClientAuthentication(false);
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSClientAuthentication(false);
+                return c;
+            });
 
         ClientConfiguration conf = new ClientConfiguration(baseClientConf);
         testClient(conf, numBookies);
@@ -517,11 +515,10 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testClientWantsTLSNoServersHaveIt() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration();
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setTLSProviderFactoryClass(null);
-        }
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSProviderFactoryClass(null);
+                return c;
+            });
 
         ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
         try {
@@ -539,11 +536,10 @@ public class TestTLS extends BookKeeperClusterTestCase {
     @Test
     public void testTLSClientButOnlyFewTLSServers() throws Exception {
         // disable TLS on initial set of bookies
-        ServerConfiguration serverConf = new ServerConfiguration();
-        for (ServerConfiguration conf : bsConfs) {
-            conf.setTLSProviderFactoryClass(null);
-        }
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSProviderFactoryClass(null);
+                return c;
+            });
 
         // add two bookies which support TLS
         baseConf.setTLSProviderFactoryClass(TLSContextFactory.class.getName());
@@ -591,9 +587,11 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieAuthPluginRequireClientTLSAuthentication() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setBookieAuthProviderFactoryClass(AllowOnlyClientsWithX509Certificates.class.getName());
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setBookieAuthProviderFactoryClass(
+                        AllowOnlyClientsWithX509Certificates.class.getName());
+                return c;
+            });
 
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
@@ -617,11 +615,13 @@ public class TestTLS extends BookKeeperClusterTestCase {
             return;
         }
 
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setBookieAuthProviderFactoryClass(AllowOnlyClientsWithX509Certificates.class.getName());
-        serverConf.setDisableServerSocketBind(true);
-        serverConf.setEnableLocalTransport(true);
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setBookieAuthProviderFactoryClass(
+                        AllowOnlyClientsWithX509Certificates.class.getName());
+                c.setDisableServerSocketBind(true);
+                c.setEnableLocalTransport(true);
+                return c;
+            });
 
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
@@ -644,10 +644,11 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testRoleBasedAuthZInCertificate() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setBookieAuthProviderFactoryClass(BookieAuthZFactory.class.getCanonicalName());
-        serverConf.setAuthorizedRoles("testRole,testRole1");
-        restartBookies(serverConf);
+        restartBookies(serverConf -> {
+            serverConf.setBookieAuthProviderFactoryClass(BookieAuthZFactory.class.getCanonicalName());
+            serverConf.setAuthorizedRoles("testRole,testRole1");
+            return serverConf;
+        });
 
         ClientConfiguration clientConf = new ClientConfiguration(baseClientConf);
 
@@ -663,10 +664,12 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieAuthPluginDenyAccesstoClientWithoutTLSAuthentication() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setTLSClientAuthentication(false);
-        serverConf.setBookieAuthProviderFactoryClass(AllowOnlyClientsWithX509Certificates.class.getName());
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSClientAuthentication(false);
+                c.setBookieAuthProviderFactoryClass(
+                        AllowOnlyClientsWithX509Certificates.class.getName());
+                return c;
+            });
 
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
@@ -693,12 +696,14 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieAuthPluginDenyAccessToClientWithoutTLSAuthenticationLocal() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setTLSClientAuthentication(false);
-        serverConf.setBookieAuthProviderFactoryClass(AllowOnlyClientsWithX509Certificates.class.getName());
-        serverConf.setDisableServerSocketBind(true);
-        serverConf.setEnableLocalTransport(true);
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setTLSClientAuthentication(false);
+                c.setBookieAuthProviderFactoryClass(
+                        AllowOnlyClientsWithX509Certificates.class.getName());
+                c.setDisableServerSocketBind(true);
+                c.setEnableLocalTransport(true);
+                return c;
+            });
 
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
@@ -725,9 +730,11 @@ public class TestTLS extends BookKeeperClusterTestCase {
      */
     @Test
     public void testBookieAuthPluginDenyAccessToClientWithoutTLS() throws Exception {
-        ServerConfiguration serverConf = new ServerConfiguration(baseConf);
-        serverConf.setBookieAuthProviderFactoryClass(AllowOnlyClientsWithX509Certificates.class.getName());
-        restartBookies(serverConf);
+        restartBookies(c -> {
+                c.setBookieAuthProviderFactoryClass(
+                        AllowOnlyClientsWithX509Certificates.class.getName());
+                return c;
+            });
 
         secureBookieSideChannel = false;
         secureBookieSideChannelPrincipals = null;
@@ -837,7 +844,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
 
         ServerConfiguration bookieConf = newServerConfiguration();
         bookieConf.setTLSProviderFactoryClass(TLSContextFactory.class.getName());
-        bs.add(startBookie(bookieConf));
+        startAndAddBookie(bookieConf);
         testClient(clientConf, origNumBookies + 1);
     }
 
@@ -881,16 +888,16 @@ public class TestTLS extends BookKeeperClusterTestCase {
 
         // verify stats
         for (int i = 0; i < numBookies; i++) {
-            BookieServer bookie = bs.get(i);
+            BookieServer bookie = serverByIndex(i);
             StringBuilder nameBuilder = new StringBuilder(BookKeeperClientStats.CHANNEL_SCOPE)
                     .append(".")
                     .append("bookie_")
-                    .append(bookie.getBookieId().toString()
-                    .replace('.', '_')
-                    .replace('-', '_'))
+                    .append(TestUtils.buildStatsCounterPathFromBookieID(bookie.getBookieId()))
                     .append(".");
-
             // check stats on TLS enabled client
+           TestStatsProvider.TestCounter cntr =  tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
+                    + BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER);
+
             assertEquals("Mismatch TLS channel count", 1,
                     tlsClient.getTestStatsProvider().getCounter(nameBuilder.toString()
                     + BookKeeperClientStats.ACTIVE_TLS_CHANNEL_COUNTER).get().longValue());
@@ -939,7 +946,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
 
         // restart a bookie with wrong trust store
         int restartBookieIdx = 0;
-        ServerConfiguration badBookieConf = bsConfs.get(restartBookieIdx);
+        ServerConfiguration badBookieConf = confByIndex(restartBookieIdx);
 
         switch (serverTrustStoreFormat) {
             case PEM:
@@ -961,10 +968,7 @@ public class TestTLS extends BookKeeperClusterTestCase {
         killBookie(restartBookieIdx);
         LOG.info("Sleeping for 1s before restarting bookie with bad cert");
         Thread.sleep(1000);
-        BookieServer bookie = startBookie(badBookieConf);
-        bs.add(bookie);
-        bsConfs.add(badBookieConf);
-
+        BookieServer bookie = startAndAddBookie(badBookieConf).getServer();
         // Create ledger and write entries
         TestStatsProvider testStatsProvider = new TestStatsProvider();
         BookKeeperTestClient client = new BookKeeperTestClient(clientConf, testStatsProvider);
@@ -986,11 +990,8 @@ public class TestTLS extends BookKeeperClusterTestCase {
         StringBuilder nameBuilder = new StringBuilder(BookKeeperClientStats.CHANNEL_SCOPE)
                 .append(".")
                 .append("bookie_")
-                .append(bookie.getBookieId().toString()
-                        .replace('.', '_')
-                        .replace('-', '_'))
+                .append(TestUtils.buildStatsCounterPathFromBookieID(bookie.getBookieId()))
                 .append(".");
-
         assertEquals("TLS handshake failure expected", 1,
                 client.getTestStatsProvider().getCounter(nameBuilder.toString()
                 + BookKeeperClientStats.FAILED_TLS_HANDSHAKE_COUNTER).get().longValue());
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
index 7a15b5d..462d472 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/TestUtils.java
@@ -33,6 +33,7 @@ import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.ReadHandle;
 
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.commons.io.FileUtils;
 import org.junit.Assert;
 
@@ -44,6 +45,10 @@ public final class TestUtils {
 
     private TestUtils() {}
 
+    public static String buildStatsCounterPathFromBookieID(BookieId bookieId) {
+        return bookieId.toString().replace('.', '_').replace('-', '_').replace(":", "_");
+    }
+
     public static boolean hasLogFiles(File ledgerDirectory, boolean partial, Integer... logsId) {
         boolean result = partial ? false : true;
         Set<Integer> logs = new HashSet<Integer>();