You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/01/31 19:10:55 UTC

[GitHub] sijie closed pull request #1069: BP-26 (task-2): make distributedlog modules be able to be built in bookkeeper repo

sijie closed pull request #1069: BP-26 (task-2): make distributedlog modules be able to be built in bookkeeper repo
URL: https://github.com/apache/bookkeeper/pull/1069
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/jenkins/common_job_properties.groovy b/.test-infra/jenkins/common_job_properties.groovy
index 9c99cb5e4..7ff5fe1ff 100644
--- a/.test-infra/jenkins/common_job_properties.groovy
+++ b/.test-infra/jenkins/common_job_properties.groovy
@@ -42,7 +42,7 @@ class common_job_properties {
   static void setTopLevelMainJobProperties(context,
                                            String branch = 'master',
                                            String jdkVersion = 'JDK 1.8 (latest)',
-                                           int timeout = 100,
+                                           int timeout = 200,
                                            String jenkinsExecutorLabel = 'ubuntu') {
     // GitHub project.
     context.properties {
diff --git a/.test-infra/jenkins/job_bookkeeper_precommit_java8.groovy b/.test-infra/jenkins/job_bookkeeper_precommit_java8.groovy
index 0bd2c8d93..5b6be305b 100644
--- a/.test-infra/jenkins/job_bookkeeper_precommit_java8.groovy
+++ b/.test-infra/jenkins/job_bookkeeper_precommit_java8.groovy
@@ -30,7 +30,7 @@ mavenJob('bookkeeper_precommit_pullrequest_java8') {
     delegate,
     'master',
     'JDK 1.8 (latest)',
-    120)
+    200)
 
   // Sets that this is a PreCommit job.
   common_job_properties.setPreCommit(delegate, 'Maven clean install (Java 8)')
diff --git a/.test-infra/jenkins/job_bookkeeper_precommit_java9.groovy b/.test-infra/jenkins/job_bookkeeper_precommit_java9.groovy
index a41bcfd68..d0cea2b2c 100644
--- a/.test-infra/jenkins/job_bookkeeper_precommit_java9.groovy
+++ b/.test-infra/jenkins/job_bookkeeper_precommit_java9.groovy
@@ -30,7 +30,7 @@ mavenJob('bookkeeper_precommit_pullrequest_java9') {
     delegate,
     'master',
     'JDK 1.9 (latest)',
-    120)
+    200)
 
   // Sets that this is a PreCommit job.
   common_job_properties.setPreCommit(delegate, 'Maven clean install (Java 9)')
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index 2f537c917..f0721b6ab 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -284,13 +284,16 @@
             <exclude>**/.project</exclude>
             <exclude>**/.checkstyle</exclude>
             <exclude>**/.settings/*</exclude>
-            <exclude>certs/keyStoreClientPassword.txt</exclude>
-            <exclude>certs/keyStoreServerPassword.txt</exclude>
-            <exclude>certs/trustStorePassword.txt</exclude>
-            <exclude>src/test/resources/cacerts</exclude>
+            <exclude>src/test/resources/server-key.pem</exclude>
+            <exclude>src/test/resources/server-key.p12</exclude>
+            <exclude>src/test/resources/server-key.jks</exclude>
+            <exclude>src/test/resources/server-cert.pem</exclude>
+            <exclude>src/test/resources/client-key.pem</exclude>
+            <exclude>src/test/resources/client-key.p12</exclude>
+            <exclude>src/test/resources/client-key.jks</exclude>
+            <exclude>src/test/resources/client-cert.pem</exclude>
             <exclude>src/test/resources/keyStoreClientPassword.txt</exclude>
             <exclude>src/test/resources/keyStoreServerPassword.txt</exclude>
-            <exclude>src/test/resources/trustStorePassword.txt</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -426,5 +429,30 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>tls-certs</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>1.6.0</version>
+            <executions>
+              <execution>
+                <id>Generate Self-Signed Certificates</id>
+                <phase>generate-test-resources</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+                <configuration>
+                  <workingDirectory>${basedir}/src/test/resources</workingDirectory>
+                  <executable>${basedir}/src/test/resources/generateKeysAndCerts.sh</executable>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 51aa79fff..44fcaa9bf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -3157,15 +3157,16 @@ protected void scanEntryLogForSpecificEntry(long logId, final long ledgerId, fin
         final MutableBoolean entryFound = new MutableBoolean(false);
         scanEntryLog(logId, new EntryLogScanner() {
             @Override
-            public boolean accept(long ledgerId) {
-                return (((!entryFound.booleanValue()) || (entryId == -1)));
+            public boolean accept(long candidateLedgerId) {
+                return ((candidateLedgerId == ledgerId) && ((!entryFound.booleanValue()) || (entryId == -1)));
             }
 
             @Override
-            public void process(long ledgerId, long startPos, ByteBuf entry) {
+            public void process(long candidateLedgerId, long startPos, ByteBuf entry) {
                 long entrysLedgerId = entry.getLong(entry.readerIndex());
                 long entrysEntryId = entry.getLong(entry.readerIndex() + 8);
-                if ((ledgerId == entrysLedgerId) && ((entrysEntryId == entryId)) || (entryId == -1)) {
+                if ((candidateLedgerId == entrysLedgerId) && (candidateLedgerId == ledgerId)
+                        && ((entrysEntryId == entryId) || (entryId == -1))) {
                     entryFound.setValue(true);
                     formatEntry(startPos, entry, printMsg);
                 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index 50ee81ce3..60058e363 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -66,4 +66,9 @@
     String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
     String CHANNEL_START_TLS_OP = "START_TLS";
     String CHANNEL_TIMEOUT_START_TLS_OP = "TIMEOUT_START_TLS";
+    String NETTY_EXCEPTION_CNT = "NETTY_EXCEPTION_CNT";
+    String CLIENT_CONNECT_TIMER = "CLIENT_CONNECT_TIMER";
+    String ADD_OP_OUTSTANDING = "ADD_OP_OUTSTANDING";
+    String READ_OP_OUTSTANDING = "READ_OP_OUTSTANDING";
+    String NETTY_OPS = "NETTY_OPS";
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
index f963f8b63..e772c7917 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/AbstractConfiguration.java
@@ -22,7 +22,6 @@
 import java.net.URL;
 import java.util.Iterator;
 import java.util.List;
-
 import javax.net.ssl.SSLEngine;
 
 import org.apache.bookkeeper.feature.Feature;
@@ -100,6 +99,17 @@
     */
     protected static final String TLS_ENABLED_PROTOCOLS = "tlsEnabledProtocols";
 
+    /**
+     * TLS KeyStore, TrustStore, Password files and Certificate Paths.
+     */
+    protected static final String TLS_KEYSTORE_TYPE = "tlsKeyStoreType";
+    protected static final String TLS_KEYSTORE = "tlsKeyStore";
+    protected static final String TLS_KEYSTORE_PASSWORD_PATH = "tlsKeyStorePasswordPath";
+    protected static final String TLS_TRUSTSTORE_TYPE = "tlsTrustStoreType";
+    protected static final String TLS_TRUSTSTORE = "tlsTrustStore";
+    protected static final String TLS_TRUSTSTORE_PASSWORD_PATH = "tlsTrustStorePasswordPath";
+    protected static final String TLS_CERTIFICATE_PATH = "tlsCertificatePath";
+
     //Netty configuration
     protected static final String NETTY_MAX_FRAME_SIZE = "nettyMaxFrameSizeBytes";
     protected static final int DEFAULT_NETTY_MAX_FRAME_SIZE = 5 * 1024 * 1024; // 5MB
@@ -110,6 +120,9 @@
     // Kluge for compatibility testing. Never set this outside tests.
     public static final String LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK = "ledgerManagerFactoryDisableClassCheck";
 
+    // Validate bookie process user
+    public static final String PERMITTED_STARTUP_USERS = "permittedStartupUsers";
+
     protected AbstractConfiguration() {
         super();
         if (READ_SYSTEM_PROPERTIES) {
@@ -118,6 +131,20 @@ protected AbstractConfiguration() {
         }
     }
 
+    /**
+     * Limit who can start the application to prevent future permission errors.
+     */
+    public void setPermittedStartupUsers(String s) {
+        setProperty(PERMITTED_STARTUP_USERS, s);
+    }
+
+    /**
+     * Get array of users specified in this property.
+     */
+    public String[] getPermittedStartupUsers() {
+        return getStringArray(PERMITTED_STARTUP_USERS);
+    }
+
     /**
      * You can load configurations in precedence order. The first one takes
      * precedence over any loaded later.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 4d9c77478..424ae34f1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -51,6 +51,43 @@
     // Passwd
     protected static final String PASSWD = "passwd";
 
+    // Client TLS (@deprecated since 4.7.0)
+    /**
+     * @deprecated Use {@link #TLS_KEYSTORE_TYPE}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_KEYSTORE_TYPE = "clientKeyStoreType";
+
+    /**
+     * @deprecated Use {@link #TLS_KEYSTORE}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_KEYSTORE = "clientKeyStore";
+
+    /**
+     * @deprecated Use {@link #TLS_KEYSTORE_PASSWORD_PATH}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_KEYSTORE_PASSWORD_PATH = "clientKeyStorePasswordPath";
+
+    /**
+     * @deprecated Use {@link #TLS_TRUSTSTORE_TYPE}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_TRUSTSTORE_TYPE = "clientTrustStoreType";
+
+    /**
+     * @deprecated Use {@link #TLS_TRUSTSTORE}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_TRUSTSTORE = "clientTrustStore";
+
+    /**
+     * @deprecated Use {@link #TLS_TRUSTSTORE_PASSWORD_PATH}
+     */
+    @Deprecated
+    protected static final String CLIENT_TLS_TRUSTSTORE_PASSWORD_PATH = "clientTrustStorePasswordPath";
+
     // NIO Parameters
     protected static final String CLIENT_TCP_NODELAY = "clientTcpNoDelay";
     protected static final String CLIENT_SOCK_KEEPALIVE = "clientSockKeepalive";
@@ -145,14 +182,6 @@
     // Client auth provider factory class name. It must be configured on Bookies to for the Auditor
     protected static final String CLIENT_AUTH_PROVIDER_FACTORY_CLASS = "clientAuthProviderFactoryClass";
 
-    // Client TLS
-    protected static final String TLS_KEYSTORE_TYPE = "clientKeyStoreType";
-    protected static final String TLS_KEYSTORE = "clientKeyStore";
-    protected static final String TLS_KEYSTORE_PASSWORD_PATH = "clientKeyStorePasswordPath";
-    protected static final String TLS_TRUSTSTORE_TYPE = "clientTrustStoreType";
-    protected static final String TLS_TRUSTSTORE = "clientTrustStore";
-    protected static final String TLS_TRUSTSTORE_PASSWORD_PATH = "clientTrustStorePasswordPath";
-
     // Registration Client
     protected static final String REGISTRATION_CLIENT_CLASS = "registrationClientClass";
 
@@ -1419,7 +1448,7 @@ public String getClientRole() {
      * @return
      */
     public String getTLSKeyStoreType() {
-        return getString(TLS_KEYSTORE_TYPE, "JKS");
+        return getString(CLIENT_TLS_KEYSTORE_TYPE, getString(TLS_KEYSTORE_TYPE, "JKS"));
     }
 
 
@@ -1439,7 +1468,7 @@ public ClientConfiguration setTLSKeyStoreType(String arg) {
      * @return
      */
     public String getTLSKeyStore() {
-        return getString(TLS_KEYSTORE, null);
+        return getString(CLIENT_TLS_KEYSTORE, getString(TLS_KEYSTORE, null));
     }
 
     /**
@@ -1458,7 +1487,7 @@ public ClientConfiguration setTLSKeyStore(String arg) {
      * @return
      */
     public String getTLSKeyStorePasswordPath() {
-        return getString(TLS_KEYSTORE_PASSWORD_PATH, null);
+        return getString(CLIENT_TLS_KEYSTORE_PASSWORD_PATH, getString(TLS_KEYSTORE_PASSWORD_PATH, null));
     }
 
     /**
@@ -1477,7 +1506,7 @@ public ClientConfiguration setTLSKeyStorePasswordPath(String arg) {
      * @return
      */
     public String getTLSTrustStoreType() {
-        return getString(TLS_TRUSTSTORE_TYPE, "JKS");
+        return getString(CLIENT_TLS_TRUSTSTORE_TYPE, getString(TLS_TRUSTSTORE_TYPE, "JKS"));
     }
 
     /**
@@ -1496,7 +1525,7 @@ public ClientConfiguration setTLSTrustStoreType(String arg) {
      * @return
      */
     public String getTLSTrustStore() {
-        return getString(TLS_TRUSTSTORE, null);
+        return getString(CLIENT_TLS_TRUSTSTORE, getString(TLS_TRUSTSTORE, null));
     }
 
     /**
@@ -1516,7 +1545,7 @@ public ClientConfiguration setTLSTrustStore(String arg) {
      * @return
      */
     public String getTLSTrustStorePasswordPath() {
-        return getString(TLS_TRUSTSTORE_PASSWORD_PATH, null);
+        return getString(CLIENT_TLS_TRUSTSTORE_PASSWORD_PATH, getString(TLS_TRUSTSTORE_PASSWORD_PATH, null));
     }
 
     /**
@@ -1529,6 +1558,25 @@ public ClientConfiguration setTLSTrustStorePasswordPath(String arg) {
         return this;
     }
 
+    /**
+     * Get the path to file containing TLS Certificate.
+     *
+     * @return
+     */
+    public String getTLSCertificatePath() {
+        return getString(TLS_CERTIFICATE_PATH, null);
+    }
+
+    /**
+     * Set the path to file containing TLS Certificate.
+     *
+     * @return
+     */
+    public ClientConfiguration setTLSCertificatePath(String arg) {
+        setProperty(TLS_CERTIFICATE_PATH, arg);
+        return this;
+    }
+
     /**
      * Whether to delay ensemble change or not?
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d6e4d5f37..3a7c763de 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -168,14 +168,6 @@
     protected static final String HTTP_SERVER_ENABLED = "httpServerEnabled";
     protected static final String HTTP_SERVER_PORT = "httpServerPort";
 
-    // TLS parameters
-    protected static final String TLS_KEYSTORE_TYPE = "tlsKeyStoreType";
-    protected static final String TLS_KEYSTORE = "tlsKeyStore";
-    protected static final String TLS_KEYSTORE_PASSWORD_PATH = "tlsKeyStorePasswordPath";
-    protected static final String TLS_TRUSTSTORE_TYPE = "tlsTrustStoreType";
-    protected static final String TLS_TRUSTSTORE = "tlsTrustStore";
-    protected static final String TLS_TRUSTSTORE_PASSWORD_PATH = "tlsTrustStorePasswordPath";
-
     // Lifecycle Components
     protected static final String EXTRA_SERVER_COMPONENTS = "extraServerComponents";
 
@@ -2331,7 +2323,7 @@ public String getTLSKeyStore() {
     /**
      * Set the keystore path for the client.
      *
-     * @return ServerConfiguration
+     * @return
      */
     public ServerConfiguration setTLSKeyStore(String arg) {
         setProperty(TLS_KEYSTORE, arg);
@@ -2415,6 +2407,25 @@ public ServerConfiguration setTLSTrustStorePasswordPath(String arg) {
         return this;
     }
 
+    /**
+     * Get the path to file containing TLS Certificate.
+     *
+     * @return
+     */
+    public String getTLSCertificatePath() {
+        return getString(TLS_CERTIFICATE_PATH, null);
+    }
+
+    /**
+     * Set the path to file containing TLS Certificate.
+     *
+     * @return
+     */
+    public ServerConfiguration setTLSCertificatePath(String arg) {
+        setProperty(TLS_CERTIFICATE_PATH, arg);
+        return this;
+    }
+
 
     /**
      * Whether to enable recording task execution stats.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index dc2cbcefe..3c5385c5c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -223,6 +223,7 @@ public SocketAddress getRemoteAddr() {
                             result.addAll(Arrays.asList(certificates));
                             return result;
                         } catch (SSLPeerUnverifiedException err) {
+                            LOG.error("Failed to get peer certificates", err);
                             return Collections.emptyList();
                         }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index 6186e446d..602134a98 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -22,10 +22,13 @@
 
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
+import static org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP_USERS;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.security.AccessControlException;
+import java.util.Arrays;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.BookieException;
@@ -77,6 +80,7 @@ public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
             throws IOException, KeeperException, InterruptedException,
             BookieException, UnavailableException, CompatibilityException, SecurityException {
         this.conf = conf;
+        validateUser(conf);
         this.statsLogger = statsLogger;
         this.nettyServer = new BookieNettyServer(this.conf, null);
         try {
@@ -159,6 +163,28 @@ public synchronized void shutdown() {
         running = false;
     }
 
+    /**
+     * Ensure the current user can start-up the process if it's restricted.
+     */
+    private void validateUser(ServerConfiguration conf) throws AccessControlException {
+        if (conf.containsKey(PERMITTED_STARTUP_USERS)) {
+            String currentUser = System.getProperty("user.name");
+            String[] propertyValue = conf.getPermittedStartupUsers();
+            for (String s : propertyValue) {
+                if (s.equals(currentUser)) {
+                    return;
+                }
+            }
+            String errorMsg =
+                    "System cannot start because current user isn't in permittedStartupUsers."
+                            + " Current user: " + currentUser + " permittedStartupUsers: "
+                            + Arrays.toString(propertyValue);
+            LOG.error(errorMsg);
+            throw new AccessControlException(errorMsg);
+        }
+    }
+
+
     public boolean isRunning() {
         return bookie.isRunning() && nettyServer.isRunning() && running;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index e0199ffb9..a486d5716 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -109,6 +109,7 @@
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacRequest;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.WriteLacResponse;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -173,6 +174,12 @@
     private final OpStatsLogger getBookieInfoTimeoutOpLogger;
     private final OpStatsLogger startTLSOpLogger;
     private final OpStatsLogger startTLSTimeoutOpLogger;
+    private final OpStatsLogger connectTimer;
+    private final Counter exceptionCounter;
+    private final Counter addEntryOutstanding;
+    private final Counter readEntryOutstanding;
+    /* collect stats on all Ops that flows through netty pipeline */
+    private final OpStatsLogger nettyOpLogger;
 
     private final boolean useV2WireProtocol;
 
@@ -270,6 +277,11 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor exec
         getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO);
         startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP);
         startTLSTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_START_TLS_OP);
+        exceptionCounter = statsLogger.getCounter(BookKeeperClientStats.NETTY_EXCEPTION_CNT);
+        connectTimer = statsLogger.getOpStatsLogger(BookKeeperClientStats.CLIENT_CONNECT_TIMER);
+        addEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.ADD_OP_OUTSTANDING);
+        readEntryOutstanding = statsLogger.getCounter(BookKeeperClientStats.READ_OP_OUTSTANDING);
+        nettyOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.NETTY_OPS);
 
         this.pcbcPool = pcbcPool;
 
@@ -360,6 +372,7 @@ protected long getNumPendingCompletionRequests() {
     }
 
     protected ChannelFuture connect() {
+        final long startTime = MathUtils.nowInNano();
         if (LOG.isDebugEnabled()) {
             LOG.debug("Connecting to bookie: {}", addr);
         }
@@ -427,7 +440,8 @@ protected void initChannel(Channel ch) throws Exception {
         }
 
         ChannelFuture future = bootstrap.connect(bookieAddr);
-        future.addListener(new ConnectionFutureListener());
+        future.addListener(new ConnectionFutureListener(startTime));
+
         return future;
     }
 
@@ -861,7 +875,21 @@ private void writeAndFlush(final Channel channel,
         }
 
         try {
-            channel.writeAndFlush(request, channel.voidPromise());
+            final long startTime = MathUtils.nowInNano();
+            ChannelFuture future = channel.writeAndFlush(request);
+            future.addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                        completionObjects.get(key).setOutstanding();
+                    } else {
+                        nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime),
+                                TimeUnit.NANOSECONDS);
+                    }
+                }
+            });
         } catch (Throwable e) {
             LOG.warn("Operation {} failed", requestToString(request), e);
             errorOut(key);
@@ -968,6 +996,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      */
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        exceptionCounter.inc();
         if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
             LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress());
             ctx.close();
@@ -1152,7 +1181,7 @@ private static StatusCode getStatusCodeFromErrorCode(int errorCode) {
     private void readV3Response(final Response response) {
         final BKPacketHeader header = response.getHeader();
 
-        final CompletionValue completionValue = completionObjects.remove(newCompletionKey(header.getTxnId(),
+        final CompletionValue completionValue = completionObjects.get(newCompletionKey(header.getTxnId(),
                 header.getOperation()));
 
         if (null == completionValue) {
@@ -1177,6 +1206,8 @@ public String toString() {
                 }
             });
         }
+
+        completionObjects.remove(newCompletionKey(header.getTxnId(), header.getOperation()));
     }
 
     void initTLSHandshake() {
@@ -1328,6 +1359,9 @@ protected int logAndConvertStatus(StatusCode status, int defaultStatus,
 
         public abstract void errorOut();
         public abstract void errorOut(int rc);
+        public void setOutstanding() {
+            // no-op
+        }
 
         protected void errorOutAndRunCallback(final Runnable callback) {
             executor.submitOrdered(ledgerId,
@@ -1503,10 +1537,16 @@ public void errorOut(final int rc) {
                                                entryId, null, ctx));
         }
 
+        @Override
+        public void setOutstanding() {
+            readEntryOutstanding.inc();
+        }
+
         @Override
         public void handleV2Response(long ledgerId, long entryId,
                                      StatusCode status,
                                      BookieProtocol.Response response) {
+            readEntryOutstanding.dec();
             if (!(response instanceof BookieProtocol.ReadResponse)) {
                 return;
             }
@@ -1521,6 +1561,7 @@ public void handleV2Response(long ledgerId, long entryId,
 
         @Override
         public void handleV3Response(BookkeeperProtocol.Response response) {
+            readEntryOutstanding.dec();
             ReadResponse readResponse = response.getReadResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? readResponse.getStatus() : response.getStatus();
@@ -1735,16 +1776,23 @@ public void errorOut(final int rc) {
                     () -> writeComplete(rc, ledgerId, entryId, addr, ctx));
         }
 
+        @Override
+        public void setOutstanding() {
+            addEntryOutstanding.inc();
+        }
+
         @Override
         public void handleV2Response(
                 long ledgerId, long entryId, StatusCode status,
                 BookieProtocol.Response response) {
+            addEntryOutstanding.dec();
             handleResponse(ledgerId, entryId, status);
         }
 
         @Override
         public void handleV3Response(
                 BookkeeperProtocol.Response response) {
+            addEntryOutstanding.dec();
             AddResponse addResponse = response.getAddResponse();
             StatusCode status = response.getStatus() == StatusCode.EOK
                 ? addResponse.getStatus() : response.getStatus();
@@ -1912,13 +1960,29 @@ public void release() {
     /**
      * Connection listener.
      */
-    public class ConnectionFutureListener implements ChannelFutureListener {
+    class ConnectionFutureListener implements ChannelFutureListener {
+        private final long startTime;
+
+        ConnectionFutureListener(long startTime) {
+            this.startTime = startTime;
+        }
+
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             LOG.debug("Channel connected ({}) {}", future.isSuccess(), future.channel());
             int rc;
             Queue<GenericCallback<PerChannelBookieClient>> oldPendingOps;
 
+            /* We fill in the timer based on whether the connect operation itself succeeded regardless of
+             * whether there was a race */
+            if (future.isSuccess()) {
+                PerChannelBookieClient.this
+                .connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            } else {
+                PerChannelBookieClient.this
+                .connectTimer.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
+            }
+
             synchronized (PerChannelBookieClient.this) {
                 if (future.isSuccess() && state == ConnectionState.CONNECTING && future.channel().isActive()) {
                     LOG.info("Successfully connected to bookie: {}", future.channel());
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
index a2108a706..2d3f51b2a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tls/TLSContextFactory.java
@@ -34,8 +34,11 @@
 import java.security.KeyStore;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
+import java.security.NoSuchProviderException;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.Arrays;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
@@ -51,6 +54,13 @@
  * A factory to manage TLS contexts.
  */
 public class TLSContextFactory implements SecurityHandlerFactory {
+    /**
+     * Supported Key File Types.
+     */
+    public enum KeyStoreType {
+        PKCS12, JKS, PEM;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(TLSContextFactory.class);
     private static final String TLSCONTEXT_HANDLER_NAME = "tls";
     private String[] protocols;
@@ -73,6 +83,7 @@ private String getPasswordFromFile(String path) throws IOException {
     private KeyStore loadKeyStore(String keyStoreType, String keyStoreLocation, String keyStorePassword)
             throws KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException {
         KeyStore ks = KeyStore.getInstance(keyStoreType);
+
         try (FileInputStream ksin = new FileInputStream(keyStoreLocation)) {
             ks.load(ksin, keyStorePassword.trim().toCharArray());
         }
@@ -85,7 +96,7 @@ public String getHandlerName() {
 
     private KeyManagerFactory initKeyManagerFactory(String keyStoreType, String keyStoreLocation,
             String keyStorePasswordPath) throws SecurityException, KeyStoreException, NoSuchAlgorithmException,
-            CertificateException, IOException, UnrecoverableKeyException {
+            CertificateException, IOException, UnrecoverableKeyException, InvalidKeySpecException {
         KeyManagerFactory kmf = null;
 
         if (Strings.isNullOrEmpty(keyStoreLocation)) {
@@ -98,7 +109,7 @@ private KeyManagerFactory initKeyManagerFactory(String keyStoreType, String keyS
             keyStorePassword = getPasswordFromFile(keyStorePasswordPath);
         }
 
-        // Initialize key store
+        // Initialize key file
         KeyStore ks = loadKeyStore(keyStoreType, keyStoreLocation, keyStorePassword);
         kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
         kmf.init(ks, keyStorePassword.trim().toCharArray());
@@ -121,7 +132,7 @@ private TrustManagerFactory initTrustManagerFactory(String trustStoreType, Strin
             trustStorePassword = getPasswordFromFile(trustStorePasswordPath);
         }
 
-        // Initialize trust store
+        // Initialize trust file
         KeyStore ts = loadKeyStore(trustStoreType, trustStoreLocation, trustStorePassword);
         tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
         tmf.init(ts);
@@ -147,89 +158,179 @@ private SslProvider getTLSProvider(String sslProvider) {
         return SslProvider.JDK;
     }
 
-    private void createClientContext(AbstractConfiguration conf) throws SecurityException, KeyStoreException,
-            NoSuchAlgorithmException, CertificateException, IOException, UnrecoverableKeyException {
+    private void createClientContext(AbstractConfiguration conf)
+            throws SecurityException, KeyStoreException, NoSuchAlgorithmException, CertificateException, IOException,
+            UnrecoverableKeyException, InvalidKeySpecException, NoSuchProviderException {
         final SslContextBuilder sslContextBuilder;
         final ClientConfiguration clientConf;
         final SslProvider provider;
-        final boolean authentication;
-
-        KeyManagerFactory kmf = null;
-        TrustManagerFactory tmf = null;
+        final boolean clientAuthentication;
 
-        // get key-store and trust-store locations and passwords
+        // get key-file and trust-file locations and passwords
         if (!(conf instanceof ClientConfiguration)) {
             throw new SecurityException("Client configruation not provided");
         }
 
         clientConf = (ClientConfiguration) conf;
         provider = getTLSProvider(clientConf.getTLSProvider());
-        authentication = clientConf.getTLSClientAuthentication();
+        clientAuthentication = clientConf.getTLSClientAuthentication();
 
-        tmf = initTrustManagerFactory(clientConf.getTLSTrustStoreType(), clientConf.getTLSTrustStore(),
-                clientConf.getTLSTrustStorePasswordPath());
+        switch (KeyStoreType.valueOf(clientConf.getTLSTrustStoreType())) {
+        case PEM:
+            if (Strings.isNullOrEmpty(clientConf.getTLSTrustStore())) {
+                throw new SecurityException("CA Certificate required");
+            }
 
-        if (authentication) {
-            kmf = initKeyManagerFactory(clientConf.getTLSKeyStoreType(), clientConf.getTLSKeyStore(),
-                    clientConf.getTLSKeyStorePasswordPath());
+            sslContextBuilder = SslContextBuilder.forClient()
+                    .trustManager(new File(clientConf.getTLSTrustStore()))
+                    .ciphers(null)
+                    .sessionCacheSize(0)
+                    .sessionTimeout(0)
+                    .sslProvider(provider)
+                    .clientAuth(ClientAuth.REQUIRE);
+
+            break;
+        case JKS:
+            // falling thru, same as PKCS12
+        case PKCS12:
+            TrustManagerFactory tmf = initTrustManagerFactory(clientConf.getTLSTrustStoreType(),
+                    clientConf.getTLSTrustStore(), clientConf.getTLSTrustStorePasswordPath());
+
+            sslContextBuilder = SslContextBuilder.forClient()
+                    .trustManager(tmf)
+                    .ciphers(null)
+                    .sessionCacheSize(0)
+                    .sessionTimeout(0)
+                    .sslProvider(provider)
+                    .clientAuth(ClientAuth.REQUIRE);
+
+            break;
+        default:
+            throw new SecurityException("Invalid Truststore type: " + clientConf.getTLSTrustStoreType());
         }
 
-        // Build Ssl context
-        sslContextBuilder = SslContextBuilder.forClient()
-                                            .trustManager(tmf)
-                                            .ciphers(null)
-                                            .sessionCacheSize(0)
-                                            .sessionTimeout(0)
-                                            .sslProvider(provider)
-                                            .clientAuth(ClientAuth.REQUIRE);
-
-        /* if mutual authentication is enabled */
-        if (authentication) {
-            sslContextBuilder.keyManager(kmf);
+        if (clientAuthentication) {
+            switch (KeyStoreType.valueOf(clientConf.getTLSKeyStoreType())) {
+            case PEM:
+                final String keyPassword;
+
+                if (Strings.isNullOrEmpty(clientConf.getTLSCertificatePath())) {
+                    throw new SecurityException("Valid Certificate is missing");
+                }
+
+                if (Strings.isNullOrEmpty(clientConf.getTLSKeyStore())) {
+                    throw new SecurityException("Valid Key is missing");
+                }
+
+                if (!Strings.isNullOrEmpty(clientConf.getTLSKeyStorePasswordPath())) {
+                    keyPassword = getPasswordFromFile(clientConf.getTLSKeyStorePasswordPath());
+                } else {
+                    keyPassword = null;
+                }
+
+                sslContextBuilder.keyManager(new File(clientConf.getTLSCertificatePath()),
+                        new File(clientConf.getTLSKeyStore()), keyPassword);
+                break;
+            case JKS:
+                // falling thru, same as PKCS12
+            case PKCS12:
+                KeyManagerFactory kmf = initKeyManagerFactory(clientConf.getTLSKeyStoreType(),
+                        clientConf.getTLSKeyStore(), clientConf.getTLSKeyStorePasswordPath());
+
+                sslContextBuilder.keyManager(kmf);
+                break;
+            default:
+                throw new SecurityException("Invalid Keyfile type" + clientConf.getTLSKeyStoreType());
+            }
         }
 
         sslContext = sslContextBuilder.build();
     }
 
     private void createServerContext(AbstractConfiguration conf) throws SecurityException, KeyStoreException,
-            NoSuchAlgorithmException, CertificateException, IOException, UnrecoverableKeyException {
+            NoSuchAlgorithmException, CertificateException, IOException, UnrecoverableKeyException,
+            InvalidKeySpecException, IllegalArgumentException {
         final SslContextBuilder sslContextBuilder;
         final ServerConfiguration serverConf;
         final SslProvider provider;
-        final boolean authentication;
+        final boolean clientAuthentication;
 
-        KeyManagerFactory kmf = null;
-        TrustManagerFactory tmf = null;
-
-        // get key-store and trust-store locations and passwords
+        // get key-file and trust-file locations and passwords
         if (!(conf instanceof ServerConfiguration)) {
             throw new SecurityException("Server configruation not provided");
         }
 
         serverConf = (ServerConfiguration) conf;
         provider = getTLSProvider(serverConf.getTLSProvider());
-        authentication = serverConf.getTLSClientAuthentication();
+        clientAuthentication = serverConf.getTLSClientAuthentication();
+
+        switch (KeyStoreType.valueOf(serverConf.getTLSKeyStoreType())) {
+        case PEM:
+            final String keyPassword;
+
+            if (Strings.isNullOrEmpty(serverConf.getTLSKeyStore())) {
+                throw new SecurityException("Key path is required");
+            }
+
+            if (Strings.isNullOrEmpty(serverConf.getTLSCertificatePath())) {
+                throw new SecurityException("Certificate path is required");
+            }
 
-        kmf = initKeyManagerFactory(serverConf.getTLSKeyStoreType(), serverConf.getTLSKeyStore(),
-                serverConf.getTLSKeyStorePasswordPath());
+            if (!Strings.isNullOrEmpty(serverConf.getTLSKeyStorePasswordPath())) {
+                keyPassword = getPasswordFromFile(serverConf.getTLSKeyStorePasswordPath());
+            } else {
+                keyPassword = null;
+            }
 
-        if (authentication) {
-            tmf = initTrustManagerFactory(serverConf.getTLSTrustStoreType(), serverConf.getTLSTrustStore(),
-                    serverConf.getTLSTrustStorePasswordPath());
+            sslContextBuilder = SslContextBuilder
+                                .forServer(new File(serverConf.getTLSCertificatePath()),
+                            new File(serverConf.getTLSKeyStore()), keyPassword)
+                                .ciphers(null)
+                                .sessionCacheSize(0)
+                                .sessionTimeout(0)
+                                .sslProvider(provider)
+                                .startTls(true);
+
+            break;
+        case JKS:
+            // falling thru, same as PKCS12
+        case PKCS12:
+            KeyManagerFactory kmf = initKeyManagerFactory(serverConf.getTLSKeyStoreType(),
+                    serverConf.getTLSKeyStore(),
+                    serverConf.getTLSKeyStorePasswordPath());
+
+            sslContextBuilder = SslContextBuilder.forServer(kmf)
+                                .ciphers(null)
+                                .sessionCacheSize(0)
+                                .sessionTimeout(0)
+                                .sslProvider(provider)
+                                .startTls(true);
+
+            break;
+        default:
+            throw new SecurityException("Invalid Keyfile type" + serverConf.getTLSKeyStoreType());
         }
 
-        // Build Ssl context
-        sslContextBuilder = SslContextBuilder.forServer(kmf)
-                                            .ciphers(null)
-                                            .sessionCacheSize(0)
-                                            .sessionTimeout(0)
-                                            .sslProvider(provider)
-                                            .startTls(true);
-
-        /* if mutual authentication is enabled */
-        if (authentication) {
-            sslContextBuilder.trustManager(tmf)
-                            .clientAuth(ClientAuth.REQUIRE);
+        if (clientAuthentication) {
+            sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
+
+            switch (KeyStoreType.valueOf(serverConf.getTLSTrustStoreType())) {
+            case PEM:
+                if (Strings.isNullOrEmpty(serverConf.getTLSTrustStore())) {
+                    throw new SecurityException("CA Certificate chain is required");
+                }
+                sslContextBuilder.trustManager(new File(serverConf.getTLSTrustStore()));
+                break;
+            case JKS:
+                // falling thru, same as PKCS12
+            case PKCS12:
+                TrustManagerFactory tmf = initTrustManagerFactory(serverConf.getTLSTrustStoreType(),
+                        serverConf.getTLSTrustStore(), serverConf.getTLSTrustStorePasswordPath());
+                sslContextBuilder.trustManager(tmf);
+                break;
+            default:
+                throw new SecurityException("Invalid Truststore type" + serverConf.getTLSTrustStoreType());
+            }
         }
 
         sslContext = sslContextBuilder.build();
@@ -269,9 +370,15 @@ public synchronized void init(NodeType type, AbstractConfiguration conf) throws
         } catch (CertificateException e) {
             throw new SecurityException("Unable to load keystore", e);
         } catch (IOException e) {
-            throw new SecurityException("Error initializing TLSContext", e);
+            throw new SecurityException("Error initializing SSLContext", e);
         } catch (UnrecoverableKeyException e) {
-            throw new SecurityException("Unable to load key manager, possibly wrong password given", e);
+            throw new SecurityException("Unable to load key manager, possibly bad password", e);
+        } catch (InvalidKeySpecException e) {
+            throw new SecurityException("Unable to load key manager", e);
+        } catch (IllegalArgumentException e) {
+            throw new SecurityException("Invalid TLS configuration", e);
+        } catch (NoSuchProviderException e) {
+            throw new SecurityException("No such provider", e);
         }
     }
 
@@ -282,10 +389,16 @@ public SslHandler newTLSHandler() {
         if (protocols != null && protocols.length != 0) {
             sslHandler.engine().setEnabledProtocols(protocols);
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Enabled cipher protocols: {} ", Arrays.toString(sslHandler.engine().getEnabledProtocols()));
+        }
 
         if (ciphers != null && ciphers.length != 0) {
             sslHandler.engine().setEnabledCipherSuites(ciphers);
         }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Enabled cipher suites: {} ", Arrays.toString(sslHandler.engine().getEnabledCipherSuites()));
+        }
 
         return sslHandler;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 412e20d30..dbfb9abf1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -34,9 +34,11 @@
 import java.io.OutputStreamWriter;
 import java.net.BindException;
 import java.net.InetAddress;
+import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+
 import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -349,6 +351,105 @@ public void testRegNodeExistsAfterSessionTimeOut() throws Exception {
         }
     }
 
+    /**
+     * Verify user cannot start if user is in permittedStartupUsers conf list BKException BKUnauthorizedAccessException
+     * if cannot start.
+     */
+    @Test
+    public void testUserNotPermittedToStart() throws Exception {
+        File tmpDir = createTempDir("bookie", "test");
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        int port = PortManager.nextFreePort();
+        conf.setZkServers(null).setBookiePort(port).setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() });
+        String userString = "larry, curly,moe,,";
+        conf.setPermittedStartupUsers(userString);
+        BookieServer bs1 = null;
+
+        boolean sawException = false;
+        try {
+            bs1 = new BookieServer(conf);
+            Assert.fail("Bookkeeper should not have started since current user isn't in permittedStartupUsers");
+        } catch (AccessControlException buae) {
+            sawException = true;
+        } finally {
+            if (bs1 != null && bs1.isRunning()) {
+                bs1.shutdown();
+            }
+        }
+        assertTrue("Should have thrown exception", sawException);
+    }
+
+    /**
+     * Verify user cannot start if user is in permittedStartupUsers conf list BKException BKUnauthorizedAccessException
+     * if cannot start.
+     */
+    @Test
+    public void testUserPermittedToStart() throws Exception {
+        File tmpDir = createTempDir("bookie", "test");
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        int port = PortManager.nextFreePort();
+        conf.setZkServers(null).setBookiePort(port).setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() });
+
+        BookieServer bs1 = null;
+
+        // Multiple commas
+        String userString = "larry,,,curly ," + System.getProperty("user.name") + " ,moe";
+        conf.setPermittedStartupUsers(userString);
+        try {
+            bs1 = new BookieServer(conf);
+            bs1.start();
+        } catch (AccessControlException buae) {
+            Assert.fail("Bookkeeper should have started since current user is in permittedStartupUsers");
+        } finally {
+            if (bs1 != null && bs1.isRunning()) {
+                bs1.shutdown();
+            }
+        }
+
+        // Comma at end
+        userString = "larry ,curly, moe," + System.getProperty("user.name") + ",";
+        conf.setPermittedStartupUsers(userString);
+        try {
+            bs1 = new BookieServer(conf);
+            bs1.start();
+        } catch (AccessControlException buae) {
+            Assert.fail("Bookkeeper should have started since current user is in permittedStartupUsers");
+        } finally {
+            if (bs1 != null && bs1.isRunning()) {
+                bs1.shutdown();
+            }
+        }
+    }
+
+    /**
+     * Verify user can start if user is not in permittedStartupUsers but it is empty BKException
+     * BKUnauthorizedAccessException if cannot start.
+     */
+    @Test
+    public void testUserPermittedToStartWithMissingProperty() throws Exception {
+        File tmpDir = createTempDir("bookie", "test");
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        int port = PortManager.nextFreePort();
+        conf.setZkServers(null).setBookiePort(port).setJournalDirName(tmpDir.getPath())
+                .setLedgerDirNames(new String[] { tmpDir.getPath() });
+        BookieServer bs1 = null;
+        try {
+            bs1 = new BookieServer(conf);
+            bs1.start();
+        } catch (AccessControlException buae) {
+            Assert.fail("Bookkeeper should have started since permittedStartupUser is not specified");
+        } finally {
+            if (bs1 != null && bs1.isRunning()) {
+                bs1.shutdown();
+            }
+        }
+    }
+
     /**
      * Verify duplicate bookie server startup. Should throw
      * java.net.BindException if already BK server is running
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/PortManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/PortManager.java
index d52e9c7f8..0a4b8ab0e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/PortManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/PortManager.java
@@ -20,9 +20,7 @@
  */
 package org.apache.bookkeeper.test;
 
-import java.io.IOException;
 import java.net.ServerSocket;
-
 /**
  * Port manager allows a base port to be specified on the commandline.
  * Tests will then use ports, counting up from this base port.
@@ -32,19 +30,18 @@
     private static int nextPort = getBasePort();
 
     public static synchronized int nextFreePort() {
+        int exceptionCount = 0;
         while (true) {
-            ServerSocket ss = null;
-            try {
-                int port = nextPort++;
-                ss = new ServerSocket(port);
-                ss.setReuseAddress(true);
+            int port = nextPort++;
+            try (ServerSocket ss = new ServerSocket(port)) {
+                ss.close();
+                //Give it some time to truly close the connection
+                Thread.sleep(100);
                 return port;
-            } catch (IOException ioe) {
-            } finally {
-                if (ss != null) {
-                    try {
-                        ss.close();
-                    } catch (IOException ioe) {}
+            } catch (Exception e) {
+                exceptionCount++;
+                if (exceptionCount > 5) {
+                    throw new RuntimeException(e);
                 }
             }
         }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
index 1ba7f9476..0f51dcfaa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/tls/TestTLS.java
@@ -55,12 +55,16 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Tests with TLS enabled.
  */
+@RunWith(Parameterized.class)
 public class TestTLS extends BookKeeperClusterTestCase {
 
     private static final Logger LOG = LoggerFactory.getLogger(TestPerChannelBookieClient.class);
@@ -71,9 +75,32 @@
     private static boolean secureBookieSideChannel = false;
     private static Collection<Object> secureBookieSideChannelPrincipals = null;
 
-
-    public TestTLS() {
+    private TLSContextFactory.KeyStoreType clientKeyStoreFormat;
+    private TLSContextFactory.KeyStoreType clientTrustStoreFormat;
+    private TLSContextFactory.KeyStoreType serverKeyStoreFormat;
+    private TLSContextFactory.KeyStoreType serverTrustStoreFormat;
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { "JKS", "JKS" },
+                { "PEM", "PEM" },
+                { "PKCS12", "PKCS12" },
+                { "JKS", "PEM" },
+                { "PEM", "PKCS12" },
+                { "PKCS12", "JKS" }
+            });
+    }
+    public TestTLS(String keyStoreFormat, String trustStoreFormat) {
         super(3);
+        this.clientKeyStoreFormat = TLSContextFactory.KeyStoreType.valueOf(keyStoreFormat);
+        this.clientTrustStoreFormat = TLSContextFactory.KeyStoreType.valueOf(trustStoreFormat);
+        this.serverKeyStoreFormat = TLSContextFactory.KeyStoreType.valueOf(keyStoreFormat);
+        this.serverTrustStoreFormat = TLSContextFactory.KeyStoreType.valueOf(trustStoreFormat);
+    }
+
+    private String getResourcePath(String resource) throws Exception {
+        return this.getClass().getClassLoader().getResource(resource).toURI().getPath();
     }
 
     @Before
@@ -82,30 +109,101 @@ public void setUp() throws Exception {
         /* client configuration */
         baseClientConf.setTLSProviderFactoryClass(TLSContextFactory.class.getName());
         baseClientConf.setTLSClientAuthentication(true);
-        baseClientConf.setTLSKeyStoreType("JKS");
-        baseClientConf.setTLSKeyStore(
-            this.getClass().getClassLoader().getResource("client.jks").toURI().getPath());
-        baseClientConf.setTLSKeyStorePasswordPath(
-            this.getClass().getClassLoader().getResource("keyStoreClientPassword.txt").toURI().getPath());
-        baseClientConf.setTLSTrustStoreType("JKS");
-        baseClientConf.setTLSTrustStore(
-            this.getClass().getClassLoader().getResource("cacerts").toURI().getPath());
-        baseClientConf.setTLSTrustStorePasswordPath(
-            this.getClass().getClassLoader().getResource("trustStorePassword.txt").toURI().getPath());
+
+        switch (clientKeyStoreFormat) {
+        case PEM:
+            baseClientConf.setTLSKeyStoreType("PEM");
+            baseClientConf.setTLSKeyStore(getResourcePath("client-key.pem"));
+            baseClientConf.setTLSCertificatePath(getResourcePath("client-cert.pem"));
+
+            break;
+        case JKS:
+            baseClientConf.setTLSKeyStoreType("JKS");
+            baseClientConf.setTLSKeyStore(getResourcePath("client-key.jks"));
+            baseClientConf.setTLSKeyStorePasswordPath(getResourcePath("keyStoreClientPassword.txt"));
+
+            break;
+        case PKCS12:
+            baseClientConf.setTLSKeyStoreType("PKCS12");
+            baseClientConf.setTLSKeyStore(getResourcePath("client-key.p12"));
+            baseClientConf.setTLSKeyStorePasswordPath(getResourcePath("keyStoreClientPassword.txt"));
+
+            break;
+        default:
+            throw new Exception("Invalid client keystore format" + clientKeyStoreFormat);
+        }
+
+        switch (clientTrustStoreFormat) {
+        case PEM:
+            baseClientConf.setTLSTrustStoreType("PEM");
+            baseClientConf.setTLSTrustStore(getResourcePath("server-cert.pem"));
+
+            break;
+        case JKS:
+            baseClientConf.setTLSTrustStoreType("JKS");
+            baseClientConf.setTLSTrustStore(getResourcePath("server-key.jks"));
+            baseClientConf.setTLSTrustStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+
+            break;
+        case PKCS12:
+            baseClientConf.setTLSTrustStoreType("PKCS12");
+            baseClientConf.setTLSTrustStore(getResourcePath("server-key.p12"));
+            baseClientConf.setTLSTrustStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+
+            break;
+        default:
+            throw new Exception("Invalid client keystore format" + clientTrustStoreFormat);
+        }
 
         /* server configuration */
         baseConf.setTLSProviderFactoryClass(TLSContextFactory.class.getName());
         baseConf.setTLSClientAuthentication(true);
-        baseConf.setTLSKeyStoreType("JKS");
-        baseConf.setTLSKeyStore(
-            this.getClass().getClassLoader().getResource("server.jks").toURI().getPath());
-        baseConf.setTLSKeyStorePasswordPath(
-            this.getClass().getClassLoader().getResource("keyStoreServerPassword.txt").toURI().getPath());
-        baseConf.setTLSTrustStoreType("JKS");
-        baseConf.setTLSTrustStore(
-            this.getClass().getClassLoader().getResource("cacerts").toURI().getPath());
-        baseConf.setTLSTrustStorePasswordPath(
-            this.getClass().getClassLoader().getResource("trustStorePassword.txt").toURI().getPath());
+
+        switch (serverKeyStoreFormat) {
+        case PEM:
+            baseConf.setTLSKeyStoreType("PEM");
+            baseConf.setTLSKeyStore(getResourcePath("server-key.pem"));
+            baseConf.setTLSCertificatePath(getResourcePath("server-cert.pem"));
+
+            break;
+        case JKS:
+            baseConf.setTLSKeyStoreType("JKS");
+            baseConf.setTLSKeyStore(getResourcePath("server-key.jks"));
+            baseConf.setTLSKeyStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+
+            break;
+        case PKCS12:
+            baseConf.setTLSKeyStoreType("PKCS12");
+            baseConf.setTLSKeyStore(getResourcePath("server-key.p12"));
+            baseConf.setTLSKeyStorePasswordPath(getResourcePath("keyStoreServerPassword.txt"));
+
+            break;
+        default:
+            throw new Exception("Invalid server keystore format" + serverKeyStoreFormat);
+        }
+
+        switch (serverTrustStoreFormat) {
+        case PEM:
+            baseConf.setTLSTrustStoreType("PEM");
+            baseConf.setTLSTrustStore(getResourcePath("client-cert.pem"));
+
+            break;
+        case JKS:
+            baseConf.setTLSTrustStoreType("JKS");
+            baseConf.setTLSTrustStore(getResourcePath("client-key.jks"));
+            baseConf.setTLSTrustStorePasswordPath(getResourcePath("keyStoreClientPassword.txt"));
+
+            break;
+
+        case PKCS12:
+            baseConf.setTLSTrustStoreType("PKCS12");
+            baseConf.setTLSTrustStore(getResourcePath("client-key.p12"));
+            baseConf.setTLSTrustStorePasswordPath(getResourcePath("keyStoreClientPassword.txt"));
+
+            break;
+        default:
+            throw new Exception("Invalid server keystore format" + serverTrustStoreFormat);
+        }
 
         super.setUp();
     }
@@ -119,7 +217,7 @@ public void tearDown() throws Exception {
     /**
      * Verify that a server will not start if tls is enabled but no cert is specified.
      */
-    @Test
+    @Test(timeout = 60000)
     public void testStartTLSServerNoKeyStore() throws Exception {
         ServerConfiguration bookieConf = newServerConfiguration().setTLSKeyStore(null);
 
diff --git a/bookkeeper-server/src/test/resources/cacerts b/bookkeeper-server/src/test/resources/cacerts
deleted file mode 100644
index 5016b0c74..000000000
Binary files a/bookkeeper-server/src/test/resources/cacerts and /dev/null differ
diff --git a/bookkeeper-server/src/test/resources/client-cert.pem b/bookkeeper-server/src/test/resources/client-cert.pem
new file mode 100644
index 000000000..d819f198b
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/client-cert.pem
@@ -0,0 +1,32 @@
+-----BEGIN CERTIFICATE-----
+MIIFmTCCA4GgAwIBAgIJAKJZcAdMXw8CMA0GCSqGSIb3DQEBCwUAMGIxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEOMAwG
+A1UECgwFRHVtbXkxHjAcBgNVBAMMFWFwYWNoZS5ib29ra2VlcGVyLm9yZzAgFw0x
+ODAxMjQxODM2MjRaGA8zMDE3MDUyNzE4MzYyNFowYjELMAkGA1UEBhMCVVMxCzAJ
+BgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMQ4wDAYDVQQKDAVEdW1t
+eTEeMBwGA1UEAwwVYXBhY2hlLmJvb2trZWVwZXIub3JnMIICIjANBgkqhkiG9w0B
+AQEFAAOCAg8AMIICCgKCAgEAwdphglnvMvIrEjHgBekeTtg9KmUtmK9yviI0xKdP
+8xMC6r5lvQIXQwcVB2LA7zBl3mI3s2DT9cqZ+E6q9Vz3AF9yV+F2zvPcQOt3P9hO
+CvEj9aXqMuzvSXQYmG+w7lLhm4M7IX7B4smGvXm905WACUeLr2XM1qr2SlpMZIBc
+O35NyaOeB9srQS2NXMB8mIsWDzoftbW0eNji7uz6OuyNvQWcS7rdcXBSoBRyRU2y
+qpQcfQT9aUDttDsAtkvEvuHi8LicKocaF1ufHVFvygdX7nGCNOnC83ZW5HvttwAJ
+C8Spe67etFp98bumwsXYaPO0su7M1Ym1DbphJsGO0LNwdEWstjT4uIsxyXZtblfM
+DBFpxA4s/7hIZtmrjM3CyZuoAuJWsU8UkZOVMWtxfBbY4iCo6K/ScaFSFvMykJ+p
+MST0U83JlDrqH+XjmIxcGrs0FVgD8vtaPcPJFWhm9TUBTFo1CU3Zw74W49ltgRa/
+oOLIzn27XHCcTY0LDRLXhvtGiPOSw+2U192w0MQVby6Q/QxAGTl5UXIeHsp91fkO
+H3pRHYNdjF5CVFn5vsNEcdjQQa1bi4Cr1O64yV6kHBtvsp3bKmkfBPT+mO1OzV+N
+vjfNswk606NCd6rI8UePErDI7YIJJvHphd8EHyz/Kr64ZjLodgKCyETEDWsvVaex
+8eMCAwEAAaNQME4wHQYDVR0OBBYEFCuc137GLVBGagBjH2STNeJ2gZcZMB8GA1Ud
+IwQYMBaAFCuc137GLVBGagBjH2STNeJ2gZcZMAwGA1UdEwQFMAMBAf8wDQYJKoZI
+hvcNAQELBQADggIBAJ/PW17Gk5qbsKxnxdpNctqCOlYOLcZ+/45LvTKFQDgi8kYi
+/4WkhvADWHYYTXRibgHYWMxU+/JCbgOgwPP30646mq0cvQGYN7aQTjYY8TRMbbLh
+PGGHu30EwL/siXVsHBS0ZonXpRvYr+Bn5V0JXdw1JNjGs4C3GxyIoK1zVqXRmS/b
+u052AtKN8A+iLw4F/8b5rmpRg1BDguc1kqaVh0c64aVZ49QM1IVpvWx1fIpxZPj2
+9YlHUYmmt7mf6313VplRL98jqH+U959ueJjQ9RmEMwgUeu3iIN75/kytkpGrMLdC
+WwKlDWBid66TJaPgJ8mBUC9LNlndSe5bzpueN12MFtaLgaWopiB8WU5pEdMV2eze
+auvrUJkobl5n6vAcip1+SfrnY4EvXwdrO2VpIVkEhRWLfxsuthKwobi7/gR2jIa/
+DT/o5eqvdlxHwdIZ0VVPDdUwngQT26VkGozyrBFlhuSkv7pI/8mjFPjYF1vMcQbi
+J6C3C/K9gQp8XXOFm8r3Szqd14yzqigx2SWz4VWY7L7vX2iVs5gNb1/Fnjt2lQ20
+XMCDNoN8tO6kIqv03L4MRue5ISbJx5hcFML1/aLaidpOhWWgEBsH5hhOVdzzAOGF
+vWBXMgfSLxq/cD2ZZARq5ZHhsX1x5BeZfw4zS6dJspxF6Io4CETOOxi6AA3L
+-----END CERTIFICATE-----
diff --git a/bookkeeper-server/src/test/resources/client-key.jks b/bookkeeper-server/src/test/resources/client-key.jks
new file mode 100644
index 000000000..5bd6e53cc
Binary files /dev/null and b/bookkeeper-server/src/test/resources/client-key.jks differ
diff --git a/bookkeeper-server/src/test/resources/client-key.p12 b/bookkeeper-server/src/test/resources/client-key.p12
new file mode 100644
index 000000000..36a950934
Binary files /dev/null and b/bookkeeper-server/src/test/resources/client-key.p12 differ
diff --git a/bookkeeper-server/src/test/resources/client-key.pem b/bookkeeper-server/src/test/resources/client-key.pem
new file mode 100644
index 000000000..d92a0bd1b
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/client-key.pem
@@ -0,0 +1,52 @@
+-----BEGIN PRIVATE KEY-----
+MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQDB2mGCWe8y8isS
+MeAF6R5O2D0qZS2Yr3K+IjTEp0/zEwLqvmW9AhdDBxUHYsDvMGXeYjezYNP1ypn4
+Tqr1XPcAX3JX4XbO89xA63c/2E4K8SP1peoy7O9JdBiYb7DuUuGbgzshfsHiyYa9
+eb3TlYAJR4uvZczWqvZKWkxkgFw7fk3Jo54H2ytBLY1cwHyYixYPOh+1tbR42OLu
+7Po67I29BZxLut1xcFKgFHJFTbKqlBx9BP1pQO20OwC2S8S+4eLwuJwqhxoXW58d
+UW/KB1fucYI06cLzdlbke+23AAkLxKl7rt60Wn3xu6bCxdho87Sy7szVibUNumEm
+wY7Qs3B0Ray2NPi4izHJdm1uV8wMEWnEDiz/uEhm2auMzcLJm6gC4laxTxSRk5Ux
+a3F8FtjiIKjor9JxoVIW8zKQn6kxJPRTzcmUOuof5eOYjFwauzQVWAPy+1o9w8kV
+aGb1NQFMWjUJTdnDvhbj2W2BFr+g4sjOfbtccJxNjQsNEteG+0aI85LD7ZTX3bDQ
+xBVvLpD9DEAZOXlRch4eyn3V+Q4felEdg12MXkJUWfm+w0Rx2NBBrVuLgKvU7rjJ
+XqQcG2+yndsqaR8E9P6Y7U7NX42+N82zCTrTo0J3qsjxR48SsMjtggkm8emF3wQf
+LP8qvrhmMuh2AoLIRMQNay9Vp7Hx4wIDAQABAoICAQCqaVuGx6CrXI/YctfI2mG2
+VgmPF1q5+qIX2uIgbiSuPmw2CCJPwWLJnZQy5fFNU3J5yEXG/rvWOsCXtDA9eff4
+7+8Iqj9TNrTMrTIrge85VzqRW8VB919zZweoGaekGmAR4Y89pryyrQ4xyq/BLI9d
+mPOGwSsNG0Vfn3nAb8ak1idzts3ZgiXIKk821k+xmbNOt33gs1dvVNpJxzFCU2lW
+XXREboT0kBVSfCboHaGOqp1Qme5bdKSB58x8dKcEVna1vtQp3pJlLjn1//0R0NrP
+1iDsewLSG5nPSdJzKSjKm5uSCuvkCBjnRFsYpevUd0jGc37FyUTMSKfW9hiiBtw3
+EQnA8vMvLdeJvBTPrFerfbrkbXhxWKATTO+nN6JThQgUzIQdxiGIpOPxKli4t9xm
+qvFfokOfacymTzyDu/R9upZokRiblO0xtJ89odVSS8SwgskoiXgm3InOiATxWTaM
+gTG/8rPvG6OTSYgBqLRQ5z2XgOG7oEBU9rlPfyivlGH7N5LrCHXF2j+4+fMxgfbe
+vO+FqI0SQvtW8mGxuJ3aaKuJORZDQrTg3Z/AccZTDfFAvmRbAny8IZR/H00GIvFR
+Arwvt0fh/8IGekZVzA5DRW89DWaXUQI5lhX4UviThQK9fw+P4ZCmHmVoLpd0jxYY
+utyQpRbz1rYyiPd/TPRdQQKCAQEA9PL179+hQp6vPCfI6lT/Rus5JQtyu08J8CgJ
+x81m8rySLgsAvFBLLLqm19QViLkgryizhllnf6gUExFEG317rqlR3/1gso/QgWdj
+L7hexHhJPLUYPkHH0CBPceAKWJfFpSkqXD7tskumtj4X3Bdp0GRKHitWpZoF/FiX
+9A2djrSHS725eWCU3KgDtKVf8Ygz3Kx7XGw4+RmE9EBtLItxvlXCPsYOfWz6O06O
+4iFaAebHA23IwOw3wk36XU86aelSQZMiXHJcfgzTJyGZOY0UgKuamuU6WchAmzK1
+Ao0tCKfqjFOdQIWGDP/g/ceHq0aMSOwIAFn6GVo4+OYgdbGXGQKCAQEAyplJaVtF
+q1VEsQt9kq/WwGfjjGHPLJEVMHRnNcORg+FarfbKY5jt8nv00eMGm65BxmzXGmvl
+pxyQjZw6nZ9JQfKe7fPRE0Dz3aELSS5+8MiljpndMpRBdj4P13phcKYA9VxYyHVX
+sSt0ZGI2hDuk2LRmPDjkUMtn1FE9wH8WrBH2vENMs16f9lzpUEtpaz7HyFWyscJX
+1koJIWdR93joQMyDb0B34lNoXQ2OrfiS5/AvkeSwkj98nm9kJfsm1bKUq1pYesTT
+Owux602R6fGX8VB5hyYIPA0WqEkxHvwS65bcPWexLpIyDd0uixNK9WUOAWyfMyD2
+fdcrocl7ay6cWwKCAQB2cY1uwkot9qFxiyNh/Fu8JT3qpdCCtkNt905TaQUg1wIw
+dW2ToZfYNyE6N/l5tVsSl7HHgy/C0Ll0RuMSD+lgmctXbiP19Ai0qhOSHarlgeyY
+CFGCuTgvcZA41kbqc+lEZdVv6ZXyoxYoBXpwGHo4JGaalAY/6Wx/iy9e+b54JN9P
+RpyLDqKs2CmCjn0IQ/4f9N9p34LlIOvjV8vywDLuAHX++LJFAA834lLBEbN+O+N7
+yvhKIW8M67vmpsruL75wqv7wiPQkl3r67woyg/+oAFKwF6vRgj2LTkesxitChj+q
+PzxI2MfrPUfEL1lw/poTIN71nIyM+c2WvWBwyMDxAoIBAG4PC5RSYuyKa8CJ73OK
+Vm07gp+2WqdpQUuLUK4iSaCNAYfTs2qbn1fFAuAqJmLYLR8v7UKLLryzhcuH/Ue3
+SkKrHK9Dbma5OEFDxS/CNG91cIqhB0r8wvsLB+wUrW5Wn9qqigiLxlGWu6n0uIzp
+IcofZhJ9DXrepM7wO02hPJ3JPHJVVQtz8g4RtyVJckEyX7Fy7JooazMcEQ22ZQ68
+/d6FuzjqmrW2fdFfFg1oJdYd4pms1Eb+eiJPfOYtI5Gfa6gSclJvLhi7Z7Hd99BQ
+0Cvlfb9vZ7XHnnFZIXglk9mroIUzGUulW8+wQiKHHodkmFEpwuoxk/YUt70yCPvW
+3FUCggEAMLOHunak/3VClEI8eO6eP1JuH+cnlH2Bpj+/34h6QM8A48J/aI63d/2b
+7bas9ZB9iHfvo22Csu3tjOP4vFyCrumx8ilCgiCTY01DOHqR8UHA+OiX2yiQvEn6
+8he/ku6hizT9w1w0tf9NTxOE4Jy9M5UQ1Ol+20Iwjw99rrHyutM6HpIT9UQaLw4y
+a7tr4CF9+X/1Zjz2/He4fZ9h5akt8pHRjQi4oWx8Adi195s+Cdsrxm1NBTMFM7VU
+ywOTiEpvn6gS3PguMFLwhRObp36VBwymrC7ACcJsszqr2OypzQYbi+XyMj9WfQ80
+lZ/dii5O+DlR4aDU90na6bTa5v8ASw==
+-----END PRIVATE KEY-----
diff --git a/bookkeeper-server/src/test/resources/client.jks b/bookkeeper-server/src/test/resources/client.jks
deleted file mode 100644
index a959a9b82..000000000
Binary files a/bookkeeper-server/src/test/resources/client.jks and /dev/null differ
diff --git a/bookkeeper-server/src/test/resources/generateKeysAndCerts.sh b/bookkeeper-server/src/test/resources/generateKeysAndCerts.sh
new file mode 100755
index 000000000..3d8fdb699
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/generateKeysAndCerts.sh
@@ -0,0 +1,76 @@
+#!/usr/bin/env bash
+#
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements.  See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership.  The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License.  You may obtain a copy of the License at
+# *
+# *     http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+# This script generates keys and self-signed certificates in
+# PEM, JKS and PKCS12 formats.
+
+# remove existing files.
+rm ./server-key.pem \
+    ./server-key.p12 \
+    ./server-key.jks \
+    ./server-cert.pem \
+    ./client-key.pem \
+    ./client-key.p12 \
+    ./client-key.jks \
+    ./client-cert.pem \
+    ./keyStoreServerPassword.txt \
+    ./keyStoreClientPassword.txt
+
+
+# One line command to create server keys and self signed certificates.
+openssl req \
+        -new \
+        -newkey rsa:4096 \
+        -days 365000 \
+        -nodes \
+        -x509 \
+        -subj "/C=US/ST=CA/L=San Francisco/O=Dummy/CN=apache.bookkeeper.org" \
+        -out server-cert.pem \
+        -keyout server-key.pem
+
+# Convert crt/key to PKCS12 format.
+openssl pkcs12 -export -in server-cert.pem -inkey server-key.pem -out server-key.p12 -passout pass:server
+
+# store password in a file
+echo "server" > keyStoreServerPassword.txt
+
+# Convert crt/key to JKS format.
+keytool -importkeystore -srckeystore server-key.p12 -srcstoretype pkcs12 -srcstorepass server -destkeystore server-key.jks -deststoretype jks -deststorepass server
+
+# One line command to create client keys and self signed certificates.
+openssl req \
+        -new \
+        -newkey rsa:4096 \
+        -days 365000 \
+        -nodes \
+        -x509 \
+        -subj "/C=US/ST=CA/L=San Francisco/O=Dummy/CN=apache.bookkeeper.org" \
+        -out client-cert.pem \
+        -keyout client-key.pem
+
+# Convert crt/key to PKCS12 format.
+openssl pkcs12 -export -in client-cert.pem -inkey client-key.pem -out client-key.p12 -passout pass:client
+
+# store password in a file.
+echo "client" > keyStoreClientPassword.txt
+
+# Convert crt/key to JKS format.
+keytool -importkeystore -srckeystore client-key.p12 -srcstoretype pkcs12 -srcstorepass client -destkeystore client-key.jks -deststoretype jks -deststorepass client
+
diff --git a/bookkeeper-server/src/test/resources/server-cert.pem b/bookkeeper-server/src/test/resources/server-cert.pem
new file mode 100644
index 000000000..b19de519b
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/server-cert.pem
@@ -0,0 +1,32 @@
+-----BEGIN CERTIFICATE-----
+MIIFmTCCA4GgAwIBAgIJAJLN/+fjRP2hMA0GCSqGSIb3DQEBCwUAMGIxCzAJBgNV
+BAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzEOMAwG
+A1UECgwFRHVtbXkxHjAcBgNVBAMMFWFwYWNoZS5ib29ra2VlcGVyLm9yZzAgFw0x
+ODAxMjQxODM2MjNaGA8zMDE3MDUyNzE4MzYyM1owYjELMAkGA1UEBhMCVVMxCzAJ
+BgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMQ4wDAYDVQQKDAVEdW1t
+eTEeMBwGA1UEAwwVYXBhY2hlLmJvb2trZWVwZXIub3JnMIICIjANBgkqhkiG9w0B
+AQEFAAOCAg8AMIICCgKCAgEAvn6D0jbhA9OLpfuklC3ytUOuBXlFlgWlFEXTyqH8
+P54tH70+EJjmkV6d9kzBZohAL1cWgGwwQUR2MMIYLvKjRGnULkaCJor5WBP5D0wi
+jjTCsVNJcdC4gbEUjCl5HSmP4hPRVEHzFfWS61bJ2KDxAL7GCnAz+10MEr04KX62
+fOLr/eCHG8icfwnQ1O2fJs8cGUcxTeE5k/DV/103gH/49K3cFHHPNzyVWad8dgQ5
+Y0IZ1HTh/e/3IvsaOPp2EuKof9WtLYjjWnCMmL9MsmJiBmufetzZVqiIb8WNxa+e
+OR46Cm5lp7DwIXrUurYHTgtugmB9Pm1RL+T2YVHrPSbZ4yKYV7V6RRR/j+y4aI/7
+YXnZKI1zjDQT1GiyZDJGUBSILiMO9G5vbEL4btJlyQ4oBtlKozOLqi36GmlzOpSS
+yMVu7xyx6/va94YbI2VlTvCF/2sSUsf2ZNg4oL44+NfZDGO7vaDVFavdEsdkxxse
+wKa9uYpkfN7i4MZKi7CCR4lZo78xzt5vZ/irCqVngl3gwwBokfKLGskUi7Xwv/Sd
+gu2W88dAOzIfU878+MKeqVN3Uz91RhUgCrFhbhE/8E87WEzM0MZ68UxxlANI6tiA
+sBcYUUvhq154RPc5iY2Mzp48hju4NEKLvSbRDhJLpJGAVuupqzzM6iC1Uw23KwpI
+KOUCAwEAAaNQME4wHQYDVR0OBBYEFBjV9p9fG1YMVK7PrGrJ3wJLx370MB8GA1Ud
+IwQYMBaAFBjV9p9fG1YMVK7PrGrJ3wJLx370MAwGA1UdEwQFMAMBAf8wDQYJKoZI
+hvcNAQELBQADggIBABc8029Iw5b+qS+B110qRRfj0fBHz8x4jjIQdzHah3qqM3FM
+VzpySogESmKT3A+UmNUqzop/l+2V4G4g66UV7ps0rtJkgQU17Q70QF0TCQ8YranA
+5Vy/qZe/AXwrjjeVlmaFRxKpKWDDdlMX84+CjXdhAyMKSSezbtMKDvfERB2oGcjD
+Pz6d0XekRhqX4CiJLA2zkSZ55ABuJwyK24KeOtirsKxZXgQ5pG78zblWm3w2OdJv
+NxictCLqemDmEsMBuk2mlIBogm2tF7U9QBrIa7z2le7SjxLTO/+hWFOcDr41Bwcd
+cWoi1OkdiM3kg2COItCxtcHI8VmU17WL98SQMQsmby9RbVPXr56/x5Xi12CR/5b3
+LKCy6xmX4GEkWgc4VkvJp07eoArRxfQPEeYzZmwXBRcDO4uGRp9680QacwUvCOYe
+9Kkpe+iVdVMY2vAyil0ZmJ1oR3wW3hsiwu0RTEU1bKwrePqCR6t4EC6ihygYn0nG
+Y6PkdKMq/hoWjLo0p0iZuYMbbMNYcZWNAwV90eltw57ITyv8EI+D8hJ/b18wzPIA
+91QyqZ8qd7eo+o/YTGY0YzZHSk147pDk4nqK0d0kszr6SxBwx7i0KSsAB4Kvtrj8
+BLTOlCqwMKKmEuwlNNfonINZyjzRY5cTMeuxj8U7LjTamMOQH4kgnQ0FoEX6
+-----END CERTIFICATE-----
diff --git a/bookkeeper-server/src/test/resources/server-key.jks b/bookkeeper-server/src/test/resources/server-key.jks
new file mode 100644
index 000000000..3949b9d6e
Binary files /dev/null and b/bookkeeper-server/src/test/resources/server-key.jks differ
diff --git a/bookkeeper-server/src/test/resources/server-key.p12 b/bookkeeper-server/src/test/resources/server-key.p12
new file mode 100644
index 000000000..ab9c8aa3b
Binary files /dev/null and b/bookkeeper-server/src/test/resources/server-key.p12 differ
diff --git a/bookkeeper-server/src/test/resources/server-key.pem b/bookkeeper-server/src/test/resources/server-key.pem
new file mode 100644
index 000000000..1e2f1ec9c
--- /dev/null
+++ b/bookkeeper-server/src/test/resources/server-key.pem
@@ -0,0 +1,52 @@
+-----BEGIN PRIVATE KEY-----
+MIIJQgIBADANBgkqhkiG9w0BAQEFAASCCSwwggkoAgEAAoICAQC+foPSNuED04ul
++6SULfK1Q64FeUWWBaUURdPKofw/ni0fvT4QmOaRXp32TMFmiEAvVxaAbDBBRHYw
+whgu8qNEadQuRoImivlYE/kPTCKONMKxU0lx0LiBsRSMKXkdKY/iE9FUQfMV9ZLr
+VsnYoPEAvsYKcDP7XQwSvTgpfrZ84uv94IcbyJx/CdDU7Z8mzxwZRzFN4TmT8NX/
+XTeAf/j0rdwUcc83PJVZp3x2BDljQhnUdOH97/ci+xo4+nYS4qh/1a0tiONacIyY
+v0yyYmIGa5963NlWqIhvxY3Fr545HjoKbmWnsPAhetS6tgdOC26CYH0+bVEv5PZh
+Ues9JtnjIphXtXpFFH+P7Lhoj/thedkojXOMNBPUaLJkMkZQFIguIw70bm9sQvhu
+0mXJDigG2UqjM4uqLfoaaXM6lJLIxW7vHLHr+9r3hhsjZWVO8IX/axJSx/Zk2Dig
+vjj419kMY7u9oNUVq90Sx2THGx7Apr25imR83uLgxkqLsIJHiVmjvzHO3m9n+KsK
+pWeCXeDDAGiR8osayRSLtfC/9J2C7Zbzx0A7Mh9Tzvz4wp6pU3dTP3VGFSAKsWFu
+ET/wTztYTMzQxnrxTHGUA0jq2ICwFxhRS+GrXnhE9zmJjYzOnjyGO7g0Qou9JtEO
+EkukkYBW66mrPMzqILVTDbcrCkgo5QIDAQABAoICAQCvgea33iIQoW4vfhrS/0Z3
+pSSHHIV1RDwk4nTQY9ABWR2f+X5eUlFULAWDcJJbgjsIoscziPoomAgAwkL/tkOg
+e5SnEgVFt5MliDlW08GenZOnRuIK/8+OhfU1cdyJdsp+891QMPbjC3/SXgLYGOgS
+1LGn2lq6Q68k8Lr22C0QAQ6GuMAiZAFztjp2g3u3iOgNjh8p7tFasXCot1y0grN/
+01NKbtUIwkOj94DfRuMMxVEBArNYgCeFTi6JwpDYs4WlSdwlcNJvd/TBorbqP2Sr
+H6suyp1fjyUtPalyMmynmWbGR5JXHtkPL5khcSZnzHaDnpyl0JgVdXFeltgSXmIt
+oCF882Jc0u3qvQ+mVeW6VRQtzMxfU6L9diP7GI7x3TvJhds1WZIT7kdC5LVjkBJs
+JVYnhixEnwbQqSAk5VgqIVbDKmURAXDVp8PRpRLwKuQfgl6w1/KNzfmge1VXR3cO
+E/sX6Y00JMIJ9749JlPrcaD3fMLEwHH9NFdc7XIpMDpSmBkIa3+zoaQQ0rtMBSr3
+OF1SnHT5Xb4ykoPi5XPMLTqkkZTd622R9kBBpwkJh7Y5sGFK9FI9CmvMmC/ziHSI
+Orb+gRu39ECQB+u6It/sYvM7i/cFkCmdGcRuTaCDmYiooE8Tjp8YMR5GTyLIV874
+GKjqGSHrvaz58yHrCXEkXQKCAQEA6TvW/C4pevHjdmTrnTOp3OIDYhDvgyJq278J
+ID0sq8ZPv6V1dyjEqreXpb/R2//aPH2EdzfOpBhbmSUKj5h5gOQ3CH9vkJP27oXJ
+tx06ezSg8+WpDH0I+xYHMp2yj8GF+KrgC8qCSqxOglePVZ268ou6lyqgNYa4Ujv7
+XI8O/6ha+k6fYrEooMjINC8m0gKCzlrIvUsu9o/uhJAbxOeXX8U/+DVb397WQ6cB
+THAxo7uIzoarD7+VhWbXY+9OIANWE/RQ5rhIvOmBf+hOYgMKlX0F67uM/go0ZEvy
+05rccFsqprGEibs+Ic0w5cRwV+BqYcLrzoRkmKwkKwa9igxiTwKCAQEA0RatmiAY
+Gc1zBsfyq8f3ZqKZ2OrOCrFcsN4rMnyNBCh64yTdyNT/alCZUzy8/aKSDbvqb+Cc
+BqbCVlDq+jVkiDvs7k497noXQEJ2KfGm1g8hOX1U7AjOse4qOPhujTCYu9hSDtYB
+EPTw2OqEGOMCCuv4xzdlgK0vbSOztrF/dWp385B6hTXBvUqTlECtUhFa4QryCI9w
+NrsACQaiUt0Xu2SW1EOUv0dzTlmzyAf1s4IKHpm8t0Rkr1OqYRCqW2Ey+mKE6z6y
+NeAudaRCoxe40Z/7QrJ9eBhu+oympXazLj+BPEXHU2wkaCjXWMcolItxMpaIaD3X
+nD1Mt0QqRzq4iwKCAQA+iuNdgGtzIoYia3GbGA2Gw7ywgWYYvhP1lUa3NHBUJ7ue
+4pmbOH10YgLyWXvHCNbWvbnV1ks9SaLWcE5irzp1y7zONI4QMP1YfNvYlKfn/fbj
+MESiqqzL195aPltxnS11vyyRPN6vc4EiBqTTCpblD38bpjyL3fJzas4+xcX53IV4
+9bhb2LHSW8UD6Vj5m97DwyhtSknvqC0HszUfGhNHhTdgMb7PS4wdXB1HCBbnlxRa
+fVZFxNQtj6RWkgdbIknk0/EVzXkD34HwcLUEJ1ihOYNq8UIfpVDjTFJzV+Wg43GO
+fa/S1zkUC1f/ZSvTBMTCLmjZWjs3jYGtYANXj3aVAoIBAHzg1JKm9H4ErNyx8wgS
+CHsuRkC+DI1qXPft2VLv/LEtFCgxzpyySlJPDSQftKivvheh0mU7ezSlyJARCCak
+WQTc9adm56pVFSn2B+kJQSG8K5XQezX2FK1El8cq6aw+CBq5GlluC3j7MhX8CyVp
+/8BSK2WgemkeBqNinWVSIdQY4MeB1QtWjf3mWrpC3sGTR/n8tY3TTawCiATcB3sC
+PbhYXZUtP9v2arGy9aNUzbSGyFB6dbHnkVL931bVw0mMhgvxZ32xFnMDD/yHPJ13
+/5SDvmeZf0KJJU9TTfypJl9K4n8DFgeHIT9slSGa4WvG1LboHVRVCz9vhTA38CBW
+u/0CggEALh3hL56l5hpyXDwNQHR8XpZkX6rMuJmqD9f/18tM+deGuqj1/FRUYJrt
+/AeaN1UHjugitYDHGu2hLEORdp6mK19ENXcEajKP+f3TUliuBdogqTXCCNu1RS5p
+ru/G47QV0IxlKMxFRChL7pYs2/7PY0Qo37MvtEd6/FS7g95NZasE5ko8swkDbE7C
++F6wNBDFbS3RhIBfIPlr35Tv4sVEl7gAM7JdUqzIEDsmW38RQzIo8F8t1acwfdZ6
+PLVj2hh2JoF9jjMcy9o1VtnBWtoRkSD48+UmvrJuYrjd98XW4VRcq0VVsuOPBE+q
+EJRVpONvEXHus+f1sCUmQYWYuhmJHA==
+-----END PRIVATE KEY-----
diff --git a/bookkeeper-server/src/test/resources/server.jks b/bookkeeper-server/src/test/resources/server.jks
deleted file mode 100644
index aa582e6dd..000000000
Binary files a/bookkeeper-server/src/test/resources/server.jks and /dev/null differ
diff --git a/bookkeeper-server/src/test/resources/trustStorePassword.txt b/bookkeeper-server/src/test/resources/trustStorePassword.txt
deleted file mode 100644
index 1d40192ae..000000000
--- a/bookkeeper-server/src/test/resources/trustStorePassword.txt
+++ /dev/null
@@ -1 +0,0 @@
-changeit
diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java
index 6c84dea7d..4008c0486 100644
--- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java
+++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/main/java/org/apache/bookkeeper/stats/prometheus/PrometheusMetricsProvider.java
@@ -23,6 +23,7 @@
 import io.prometheus.client.hotspot.StandardExports;
 import io.prometheus.client.hotspot.ThreadExports;
 import java.net.InetSocketAddress;
+import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
@@ -39,6 +40,28 @@
 
     private final CollectorRegistry registry = new CollectorRegistry();
     private Server server;
+    private final CachingStatsProvider cachingStatsProvider;
+
+    public PrometheusMetricsProvider() {
+        this.cachingStatsProvider = new CachingStatsProvider(
+            new StatsProvider() {
+                @Override
+                public void start(Configuration conf) {
+                    // nop
+                }
+
+                @Override
+                public void stop() {
+                    // nop
+                }
+
+                @Override
+                public StatsLogger getStatsLogger(String scope) {
+                    return new PrometheusStatsLogger(registry, scope);
+                }
+            }
+        );
+    }
 
     @Override
     public void start(Configuration conf) {
@@ -79,7 +102,7 @@ public void stop() {
 
     @Override
     public StatsLogger getStatsLogger(String scope) {
-        return new PrometheusStatsLogger(registry, scope);
+        return this.cachingStatsProvider.getStatsLogger(scope);
     }
 
     private static final Logger log = LoggerFactory.getLogger(PrometheusMetricsProvider.class);
diff --git a/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java b/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
index e90cceeb0..84ad5f4b0 100644
--- a/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
+++ b/bookkeeper-stats-providers/prometheus-metrics-provider/src/test/java/org/apache/bookkeeper/stats/prometheus/TestPrometheusMetricsProvider.java
@@ -17,17 +17,40 @@
 package org.apache.bookkeeper.stats.prometheus;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 
 import io.prometheus.client.Collector;
 import io.prometheus.client.CollectorRegistry;
 import java.lang.reflect.Field;
 import java.util.Map;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.Test;
 
 public class TestPrometheusMetricsProvider {
 
     private final CollectorRegistry registry = new CollectorRegistry();
 
+    @Test
+    public void testCache() {
+        PrometheusMetricsProvider provider = new PrometheusMetricsProvider();
+
+        StatsLogger statsLogger =  provider.getStatsLogger("test");
+
+        OpStatsLogger opStatsLogger1 = statsLogger.getOpStatsLogger("optest");
+        OpStatsLogger opStatsLogger2 = statsLogger.getOpStatsLogger("optest");
+        assertSame(opStatsLogger1, opStatsLogger2);
+
+        Counter counter1 = statsLogger.getCounter("countertest");
+        Counter counter2 = statsLogger.getCounter("countertest");
+        assertSame(counter1, counter2);
+
+        StatsLogger scope1 = statsLogger.scope("scopetest");
+        StatsLogger scope2 = statsLogger.scope("scopetest");
+        assertSame(scope1, scope2);
+    }
+
     @Test
     public void testCounter() {
         PrometheusCounter counter = new PrometheusCounter(registry, "testcounter");
diff --git a/bookkeeper-stats-providers/twitter-finagle-provider/src/main/java/org/apache/bookkeeper/stats/twitter/finagle/FinagleStatsProvider.java b/bookkeeper-stats-providers/twitter-finagle-provider/src/main/java/org/apache/bookkeeper/stats/twitter/finagle/FinagleStatsProvider.java
index affe2f978..aff129d98 100644
--- a/bookkeeper-stats-providers/twitter-finagle-provider/src/main/java/org/apache/bookkeeper/stats/twitter/finagle/FinagleStatsProvider.java
+++ b/bookkeeper-stats-providers/twitter-finagle-provider/src/main/java/org/apache/bookkeeper/stats/twitter/finagle/FinagleStatsProvider.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.stats.twitter.finagle;
 
 import com.twitter.finagle.stats.StatsReceiver;
+import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
@@ -29,9 +30,28 @@
  */
 public class FinagleStatsProvider implements StatsProvider {
     private final StatsReceiver stats;
+    private final CachingStatsProvider cachingStatsProvider;
 
     public FinagleStatsProvider(final StatsReceiver stats) {
         this.stats = stats;
+        this.cachingStatsProvider = new CachingStatsProvider(
+            new StatsProvider() {
+                @Override
+                public void start(Configuration conf) {
+                    // nop
+                }
+
+                @Override
+                public void stop() {
+                    // nop
+                }
+
+                @Override
+                public StatsLogger getStatsLogger(String scope) {
+                    return new FinagleStatsLoggerImpl(stats.scope(scope));
+                }
+            }
+        );
     }
 
     @Override
@@ -42,6 +62,6 @@ public void stop() { /* no-op */ }
 
     @Override
     public StatsLogger getStatsLogger(final String scope) {
-        return new FinagleStatsLoggerImpl(this.stats.scope(scope));
+        return this.cachingStatsProvider.getStatsLogger(scope);
     }
 }
diff --git a/buildtools/src/main/resources/distributedlog/findbugsExclude.xml b/buildtools/src/main/resources/distributedlog/findbugsExclude.xml
new file mode 100644
index 000000000..7b2b80b99
--- /dev/null
+++ b/buildtools/src/main/resources/distributedlog/findbugsExclude.xml
@@ -0,0 +1,181 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.tests\.generated.*" />
+  </Match>
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.thrift.*" />
+  </Match>
+  <!-- distributedlog-benchmark -->
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.benchmark\.thrift.*" />
+  </Match>
+  <!-- distributedlog-common -->
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+    <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils$2"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+    <Method name="Void" />
+    <Bug pattern="NM_METHOD_NAMING_CONVENTION" />
+  </Match>
+  <!-- distributedlog-core -->
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.Entry$Builder" />
+    <Method name="setData" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.Entry" />
+    <Method name="getRawData" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.BKAsyncLogReader" />
+    <Method name="run" />
+    <Bug pattern="JLM_JSR166_UTILCONCURRENT_MONITORENTER" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.BKLogReadHandler$1" />
+    <Method name="onSuccess" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.BookKeeperClient$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.ReadUtils" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.ReadUtils$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.auditor.DLAuditor$2" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.auditor.DLAuditor$8" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$4$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.bk.SimpleLedgerAllocator$5" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.acl.ZKAccessControl$4" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.acl.ZKAccessControlManager$1$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore$1$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock$12" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.lock.ZKSessionLock$13$1" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.util.Utils" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.util.Utils$6" />
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <!-- distributedlog-protocol -->
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.LogRecord" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.LogRecord" />
+    <Method name="getPayload" />
+    <Bug pattern="EI_EXPOSE_REP" />
+  </Match>
+  <!-- distributedlog-proxy-server -->
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~org\.apache\.distributedlog\.service\.placement\.thrift.*" />
+  </Match>
+  <Match>
+    <!-- it is safe to cast exception here. //-->
+    <Class name="org.apache.distributedlog.service.DistributedLogServiceImpl$Stream$2" />
+    <Method name="onFailure" />
+    <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
+  <Match>
+    <!-- it is safe to cast exception here. //-->
+    <Class name="org.apache.distributedlog.service.stream.BulkWriteOp" />
+    <Method name="isDefiniteFailure" />
+    <Bug pattern="BC_IMPOSSIBLE_INSTANCEOF" />
+  </Match>
+  <!-- distributedlog-messaging -->
+  <Match>
+    <!-- generated code, we can't be held responsible for findbugs in it //-->
+    <Class name="~.*\.TransformedRecord" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.messaging.PartitionedMultiWriter" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+  <Match>
+    <!-- it is safe to store external bytes reference here. //-->
+    <Class name="org.apache.distributedlog.messaging.RRMultiWriter" />
+    <Bug pattern="EI_EXPOSE_REP2" />
+  </Match>
+</FindBugsFilter>
diff --git a/buildtools/src/main/resources/distributedlog/suppressions.xml b/buildtools/src/main/resources/distributedlog/suppressions.xml
new file mode 100644
index 000000000..6ee57b1b7
--- /dev/null
+++ b/buildtools/src/main/resources/distributedlog/suppressions.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<!DOCTYPE suppressions PUBLIC
+"-//Puppy Crawl//DTD Suppressions 1.1//EN"
+"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+
+<suppressions>
+  <suppress checks="JavadocPackage" files=".*[\\/]src[\\/]test[\\/].*"/>
+  <suppress checks="JavadocPackage" files=".*[\\/]maven-archetypes[\\/].*"/>
+  <suppress checks="JavadocPackage" files=".*[\\/]examples[\\/].*"/>
+  <suppress checks="AvoidStarImport" files=".*[\\/]distributedlog[\\/].*"/>
+  <suppress checks="StaticVariableName" files=".*[\\/]distributedlog[\\/].*"/>
+  <suppress checks="LocalVariableName" files=".*[\\/]distributedlog[\\/].*"/>
+  <suppress checks="LocalFinalVariableName" files=".*[\\/]distributedlog[\\/].*"/>
+  <suppress checks="MemberName" files=".*[\\/]distributedlog[\\/].*"/>
+
+  <!-- suppress all checks in the generated directories -->
+  <suppress checks=".*" files=".*[\\/]distributedlog\.thrift[\\/].*"/>
+  <suppress checks=".*" files=".+[\\/]generated[\\/].+\.java" />
+  <suppress checks=".*" files=".+[\\/]generated-sources[\\/].+\.java" />
+  <suppress checks=".*" files=".+[\\/]generated-test-sources[\\/].+\.java" />
+</suppressions>
+
diff --git a/dev/docker/Dockerfile b/dev/docker/Dockerfile
index f803b9664..228c23449 100644
--- a/dev/docker/Dockerfile
+++ b/dev/docker/Dockerfile
@@ -17,4 +17,7 @@
 # under the License.
 #
 
-FROM maven:3.5.0-jdk-8
+FROM maven:3.5.0-jdk-9
+
+RUN apt-get update
+RUN apt-get install -y g++ cmake
diff --git a/pom.xml b/pom.xml
index 8565fef5f..0742417d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
     <module>tests</module>
     <module>bookkeeper-dist</module>
     <module>microbenchmarks</module>
+    <module>stream/distributedlog</module>
   </modules>
   <mailingLists>
     <mailingList>
@@ -110,7 +111,7 @@
     <jmh.version>1.19</jmh.version>
     <junit.version>4.12</junit.version>
     <lombok.version>1.16.18</lombok.version>
-    <mockito-core.version>2.13.0</mockito-core.version>
+    <mockito.version>2.13.0</mockito.version>
     <netty.version>4.1.12.Final</netty.version>
     <netty-boringssl.version>2.0.3.Final</netty-boringssl.version>
     <powermock.version>2.0.0-beta.5</powermock.version>
@@ -181,7 +182,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>${mockito-core.version}</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/site/docs/4.6.1/api/ledger-api.md b/site/docs/4.6.1/api/ledger-api.md
index 17a883568..c247bfb8f 100644
--- a/site/docs/4.6.1/api/ledger-api.md
+++ b/site/docs/4.6.1/api/ledger-api.md
@@ -30,6 +30,51 @@ If you're using [Maven](https://maven.apache.org/), add this to your [`pom.xml`]
 </dependency>
 ```
 
+BookKeeper uses google [protobuf](https://github.com/google/protobuf/tree/master/java) and [guava](https://github.com/google/guava) libraries
+a lot. If your application might include different versions of protobuf or guava introduced by other dependencies, you can choose to use the
+shaded library, which relocate classes of protobuf and guava into a different namespace to avoid conflicts.
+
+You can use the shaded artifact of `bookkeeper-server`. Please note that [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin) doesn't generate
+a dependency-reduced pom file for shaded artifact using [shadedArtifactAttached](https://maven.apache.org/plugins/maven-shade-plugin/examples/attached-artifact.html). You need to manually to exclude relocated packages when using the shaded artifact. Full example of how to use this is
+showed as below.
+
+```xml
+<!-- in your <properties> block -->
+<bookkeeper.version>4.6.1</bookkeeper.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.bookkeeper</groupId>
+  <artifactId>bookkeeper-server</artifactId>
+  <version>${bookkeeper.version}</version>
+  <classifier>shaded</classifier> <!-- specify "shaded" classifier to use shaded artifact -->
+  <exclusions>
+    <exclusion>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-common</artifactId>
+    </exclusion>
+    <exclusion>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-proto</artifactId>
+    </exclusion>
+  </exclusions>
+</dependency>
+```
+
+Or you can use a separate shaded artifact `bookkeeper-server-shaded`.
+
+```xml
+<!-- in your <properties> block -->
+<bookkeeper.version>4.6.1</bookkeeper.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.bookkeeper</groupId>
+  <artifactId>bookkeeper-server-shaded</artifactId>
+  <version>${bookkeeper.version}</version>
+</dependency>
+```
+
 ### Gradle
 
 If you're using [Gradle](https://gradle.org/), add this to your [`build.gradle`](https://spring.io/guides/gs/gradle/) build configuration file:
@@ -45,6 +90,24 @@ dependencies {
 }
 ```
 
+Similarly as using maven, you can also configure to use the shaded jars.
+
+```groovy
+// use the shaded artifact of `bookkeeper-server` jar
+dependencies {
+    compile ('org.apache.bookkeeper:bookkeeper-server:{{ site.latest-version }}:shaded') {
+        exclude group: 'org.apache.bookkeeper', module: "bookkeeper-common'
+        exclude group: 'org.apache.bookkeeper', module: 'bookkeeper-proto'
+    }
+}
+
+
+// use the `bookkeeper-server-shaded` jar
+dependencies {
+    compile 'org.apache.bookkeeper:bookkeeper-server-shaded:{{ site.latest-version }}'
+}
+```
+
 ## Connection string
 
 When interacting with BookKeeper using the Java client, you need to provide your client with a connection string, for which you have three options:
diff --git a/site/docs/4.6.1/overview/releaseNotes.md b/site/docs/4.6.1/overview/releaseNotes.md
index ac2c49847..7d7303606 100644
--- a/site/docs/4.6.1/overview/releaseNotes.md
+++ b/site/docs/4.6.1/overview/releaseNotes.md
@@ -2,18 +2,31 @@
 title: Apache BookKeeper 4.6.1 Release Notes
 ---
 
-This is the seventh release of BookKeeper as an Apache Top Level Project!
+This is the eighth release of BookKeeper as an Apache Top Level Project!
 
-The 4.6.1 release incorporates new fixes, improvements, and features since previous major release 4.5.0.
+The 4.6.1 release is a bugfix release which fixes a bunch of issues reported from users of 4.6.0.
 
 Apache BookKeeper users are encouraged to upgrade to 4.6.1. The technical details of this release are summarized
 below.
 
 ## Highlights
 
+- Fix critical bug on index persistence manager, see [https://github.com/apache/bookkeeper/pull/913](https://github.com/apache/bookkeeper/pull/913)
+
+- Fix critical bug to allow using versions of Netty newer than 4.1.2 on classpath, see [https://github.com/apache/bookkeeper/pull/996](https://github.com/apache/bookkeeper/pull/996)
+
+- Enhance Java 9 compatibility, see [https://github.com/apache/bookkeeper/issues/326](https://github.com/apache/bookkeeper/issues/326)
+
+- New option to track task execution time, see [https://github.com/apache/bookkeeper/issues/931](https://github.com/apache/bookkeeper/issues/931)
+
+- Distribute a version of BookKeeper which embeds and relocates Guava and Protobuf, see [https://github.com/apache/bookkeeper/issues/922](https://github.com/apache/bookkeeper/issues/922)
+
+- Add description for the new error code "Too many requests", see [https://github.com/apache/bookkeeper/pull/921](https://github.com/apache/bookkeeper/pull/921)
 
 ### Dependencies Upgrade
 
+There is no dependency upgrade since 4.6.0, but now we distribute a 'shaded' version of main artifacts, see [Ledger API](../ledger-api)
 
 ## Full list of changes
 
+- [https://github.com/apache/bookkeeper/issues?utf8=%E2%9C%93&q=label%3Arelease%2F4.6.1+is%3Aclosed](https://github.com/apache/bookkeeper/issues?utf8=%E2%9C%93&q=label%3Arelease%2F4.6.1+is%3Aclosed)
diff --git a/site/releases.md b/site/releases.md
index b58c41b13..aa704194b 100644
--- a/site/releases.md
+++ b/site/releases.md
@@ -27,6 +27,15 @@ Client Guide | API docs
 
 ## News
 
+### 30 January, 2018: Release 4.6.1 available
+
+This is the eighth release of BookKeeper as an Apache Top Level Project!
+
+The 4.6.1 release is a bugfix release.
+
+See [BookKeeper 4.6.1 Release Notes](../docs/4.6.1/overview/releaseNotes) for details.
+
+
 ### 27 December, 2017: Release 4.6.0 available
 
 This is the seventh release of BookKeeper as an Apache Top Level Project!
diff --git a/stream/distributedlog/common/pom.xml b/stream/distributedlog/common/pom.xml
index 19d3109ac..d5501506a 100644
--- a/stream/distributedlog/common/pom.xml
+++ b/stream/distributedlog/common/pom.xml
@@ -20,15 +20,15 @@
   <parent>
     <groupId>org.apache.distributedlog</groupId>
     <artifactId>distributedlog</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>4.7.0-SNAPSHOT</version>
   </parent>
   <artifactId>distributedlog-common</artifactId>
-  <name>Apache DistributedLog :: Common</name>
+  <name>Apache BookKeeper :: DistributedLog :: Common</name>
   <dependencies>
     <dependency>
       <groupId>org.apache.bookkeeper.stats</groupId>
       <artifactId>bookkeeper-stats-api</artifactId>
-      <version>${bookkeeper.version}</version>
+      <version>${project.parent.version}</version>
       <exclusions>
         <exclusion>
           <groupId>org.slf4j</groupId>
@@ -36,17 +36,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>${slf4j.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <version>${lombok.version}</version>
-      <scope>provided</scope>
-    </dependency>
     <dependency>
       <groupId>org.inferred</groupId>
       <artifactId>freebuilder</artifactId>
@@ -77,24 +66,6 @@
       <artifactId>lz4</artifactId>
       <version>${lz4.version}</version>
     </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <version>${mockito.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.jmock</groupId>
       <artifactId>jmock</artifactId>
@@ -105,8 +76,8 @@
   <build>
     <plugins>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
       </plugin>
       <plugin>
         <artifactId>maven-compiler-plugin</artifactId>
@@ -146,13 +117,13 @@
             <version>${puppycrawl.checkstyle.version}</version>
           </dependency>
           <dependency>
-            <groupId>org.apache.distributedlog</groupId>
-            <artifactId>distributedlog-build-tools</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>buildtools</artifactId>
+            <version>${project.parent.version}</version>
           </dependency>
         </dependencies>
         <configuration>
-          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <configLocation>bookkeeper/checkstyle.xml</configLocation>
           <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
           <consoleOutput>true</consoleOutput>
           <failOnViolation>true</failOnViolation>
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
index 83e8e0e57..b390431b9 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -30,6 +30,7 @@
  * normally combines all properties with the same key into one list property automatically.
  * This class simply overwrites any existing mapping.
  */
+@SuppressWarnings("unchecked")
 public class ConcurrentBaseConfiguration extends AbstractConfiguration {
 
     private final ConcurrentHashMap<String, Object> map;
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
index 1131409e6..2b9994230 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentConstConfiguration.java
@@ -24,6 +24,7 @@
 /**
  * Invariant thread-safe view of some configuration.
  */
+@SuppressWarnings("unchecked")
 public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration {
     public ConcurrentConstConfiguration(Configuration conf) {
         checkNotNull(conf);
diff --git a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
index a887c59db..fecefbc3f 100644
--- a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
+++ b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
@@ -22,11 +22,9 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -175,7 +173,7 @@ public void testWithinAlreadyDone() throws Exception {
         assertFalse(withinFuture.isCancelled());
         assertFalse(withinFuture.isCompletedExceptionally());
         verify(scheduler, times(0))
-            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+            .schedule(eq(1234L), any(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
     }
 
     @Test
@@ -194,14 +192,14 @@ public void testWithinZeroTimeout() throws Exception {
         assertFalse(withinFuture.isCancelled());
         assertFalse(withinFuture.isCompletedExceptionally());
         verify(scheduler, times(0))
-            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+            .schedule(eq(1234L), any(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
     }
 
     @Test
     public void testWithinCompleteBeforeTimeout() throws Exception {
         OrderedScheduler scheduler = mock(OrderedScheduler.class);
         ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
-        when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
+        when(scheduler.schedule(any(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
             .thenAnswer(invocationOnMock -> scheduledFuture);
         CompletableFuture<Long> newFuture = FutureUtils.createFuture();
         CompletableFuture<Long> withinFuture = FutureUtils.within(
@@ -276,6 +274,7 @@ public void testEnsureFailure() throws Exception {
         ensureLatch.await();
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testRescueSuccess() throws Exception {
         CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
diff --git a/stream/distributedlog/core/pom.xml b/stream/distributedlog/core/pom.xml
index aa55c2e8c..9f1673681 100644
--- a/stream/distributedlog/core/pom.xml
+++ b/stream/distributedlog/core/pom.xml
@@ -20,10 +20,10 @@
   <parent>
     <groupId>org.apache.distributedlog</groupId>
     <artifactId>distributedlog</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>4.7.0-SNAPSHOT</version>
   </parent>
   <artifactId>distributedlog-core</artifactId>
-  <name>Apache DistributedLog :: Core Library</name>
+  <name>Apache BookKeeper :: DistributedLog :: Core Library</name>
   <dependencies>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
@@ -62,7 +62,7 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
-      <version>${bookkeeper.version}</version>
+      <version>${project.parent.version}</version>
       <type>jar</type>
       <exclusions>
         <exclusion>
@@ -87,36 +87,12 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <version>${lombok.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.jmock</groupId>
       <artifactId>jmock</artifactId>
       <version>${jmock.version}</version>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <version>${mockito.version}</version>
-      <scope>test</scope>
-    </dependency> 
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-common</artifactId>
@@ -161,8 +137,8 @@
         </configuration>
       </plugin>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -474,13 +450,13 @@
             <version>${puppycrawl.checkstyle.version}</version>
           </dependency>
           <dependency>
-            <groupId>org.apache.distributedlog</groupId>
-            <artifactId>distributedlog-build-tools</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>buildtools</artifactId>
+            <version>${project.parent.version}</version>
           </dependency>
         </dependencies>
         <configuration>
-          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <configLocation>bookkeeper/checkstyle.xml</configLocation>
           <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
           <consoleOutput>true</consoleOutput>
           <failOnViolation>true</failOnViolation>
@@ -490,7 +466,7 @@
         </configuration>
         <executions>
           <execution>
-            <phase>test-compile</phase>
+            <phase>validate</phase>
             <goals>
               <goal>check</goal>
             </goals>
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index 6c6bd4a78..60d909600 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -36,6 +36,7 @@
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
+import org.apache.bookkeeper.proto.checksum.DigestManager;
 
 /**
  * Reader used for DL tools to read entries.
@@ -184,12 +185,12 @@ public void onFailure(Throwable throwable) {
             }
 
 
-            if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.lastAddConfirmed) {
+            if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.getLastAddConfirmed()) {
                 callback.operationComplete(BKException.Code.OK, resultList);
                 return;
             }
 
-            long entryId = recoveryData.lastAddConfirmed;
+            long entryId = recoveryData.getLastAddConfirmed();
             PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, false);
             op.future().whenComplete(readListener);
             op.submit();
@@ -211,7 +212,7 @@ public void readLacs(final LedgerHandle lh, long eid,
             } else {
                 try {
                     DigestManager.RecoveryData data = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
-                    rr = new ReadResult<Long>(eid1, BKException.Code.OK, data.lastAddConfirmed, bookieAddress);
+                    rr = new ReadResult<Long>(eid1, BKException.Code.OK, data.getLastAddConfirmed(), bookieAddress);
                 } catch (BKException.BKDigestMatchException e) {
                     rr = new ReadResult<Long>(eid1, BKException.Code.DigestMatchException, null, bookieAddress);
                 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index d87f55706..7a9fa5868 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -440,7 +440,7 @@ private synchronized boolean isClosing() {
         }
         if (!cleanup) {
             LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath);
-            closePromise.complete(null);
+            FutureUtils.complete(closePromise, null);
             return closePromise;
         }
         cleanupAndClose(closePromise);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
index 88c114c0a..4e2755f12 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
@@ -19,18 +19,14 @@
 
 import static org.apache.distributedlog.DistributedLogConfiguration.*;
 
-
-
 import org.apache.distributedlog.DistributedLogConfiguration;
-
 import org.apache.distributedlog.bk.QuorumConfig;
 import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
 
-
-
 /**
  * Whitelist dynamic configuration by adding an accessor to this class.
  */
+@SuppressWarnings("unchecked")
 public class DynamicDistributedLogConfiguration extends ConcurrentBaseConfiguration {
 
     private final ConcurrentBaseConfiguration defaultConfig;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index 30c42cb33..293ac2a0c 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -51,7 +51,6 @@
 
 import org.apache.distributedlog.acl.AccessControlManager;
 import org.apache.distributedlog.acl.DefaultAccessControlManager;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.bk.LedgerAllocator;
 import org.apache.distributedlog.bk.LedgerAllocatorUtils;
@@ -83,9 +82,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
 /**
  * Manager for ZooKeeper/BookKeeper based namespace.
  */
@@ -520,8 +516,9 @@ public SubscriptionsStore getSubscriptionsStore(String streamName) {
     // Legacy Intefaces
     //
 
+    @SuppressWarnings("deprecation")
     @Override
-    public MetadataAccessor getMetadataAccessor(String streamName)
+    public org.apache.distributedlog.api.MetadataAccessor getMetadataAccessor(String streamName)
             throws InvalidStreamNameException, IOException {
         if (getBkdlConfig().isFederatedNamespace()) {
             throw new UnsupportedOperationException();
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
index ad86b5791..481d398de 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
@@ -30,7 +30,6 @@
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClientBuilder;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
 import org.apache.distributedlog.exceptions.AlreadyClosedException;
 import org.apache.distributedlog.exceptions.DLInterruptedException;
@@ -46,7 +45,8 @@
 /**
  * access to ZKMetadata.
  */
-public class ZKMetadataAccessor implements MetadataAccessor {
+@SuppressWarnings("deprecation")
+public class ZKMetadataAccessor implements org.apache.distributedlog.api.MetadataAccessor {
     static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
     protected final String name;
     protected CompletableFuture<Void> closePromise;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
index 3d5023161..62cfb98b7 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -33,9 +33,6 @@
 import org.apache.distributedlog.exceptions.BKTransmitException;
 import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
 
-
-
-
 /**
  * BookKeeper ledger based random access entry reader.
  */
@@ -85,6 +82,7 @@ public long getLastAddConfirmed() {
                 .buildReader();
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
         CompletableFuture<List<Entry.Reader>> promise = (CompletableFuture<List<Entry.Reader>>) ctx;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
index 432b22a57..84f8ff3d3 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/namespace/NamespaceDriver.java
@@ -24,7 +24,6 @@
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
 import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.InvalidStreamNameException;
@@ -34,9 +33,6 @@
 import org.apache.distributedlog.metadata.LogStreamMetadataStore;
 import org.apache.distributedlog.util.OrderedScheduler;
 
-
-
-
 /**
  * Manager to manage all the stores required by a namespace.
  */
@@ -128,7 +124,8 @@ AccessControlManager getAccessControlManager()
      * @param streamName name of log stream.
      * @return metadata accessor for log stream {@code streamName}.
      */
-    MetadataAccessor getMetadataAccessor(String streamName)
+    @SuppressWarnings("deprecation")
+    org.apache.distributedlog.api.MetadataAccessor getMetadataAccessor(String streamName)
             throws InvalidStreamNameException, IOException;
 
     /**
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index 04c081004..8ae05fe58 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -86,7 +86,6 @@
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.auditor.DLAuditor;
@@ -106,9 +105,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
 /**
  *DistributedLogTool.
 */
@@ -470,6 +466,7 @@ protected int runCmd() throws Exception {
             return 0;
         }
 
+        @SuppressWarnings("deprecation")
         protected void printStreams(Namespace namespace) throws Exception {
             Iterator<String> streams = namespace.getLogs();
             System.out.println("Streams under " + getUri() + " : ");
@@ -480,7 +477,7 @@ protected void printStreams(Namespace namespace) throws Exception {
                 if (!printMetadata) {
                     continue;
                 }
-                MetadataAccessor accessor =
+                org.apache.distributedlog.api.MetadataAccessor accessor =
                         namespace.getNamespaceDriver().getMetadataAccessor(streamName);
                 byte[] metadata = accessor.getMetadata();
                 if (null == metadata || metadata.length == 0) {
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/DLMTestUtil.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
index 7e1464c4d..986ae9e2b 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/DLMTestUtil.java
@@ -41,7 +41,6 @@
 import org.apache.distributedlog.api.AsyncLogWriter;
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.common.util.PermitLimiter;
@@ -51,12 +50,9 @@
 import org.apache.distributedlog.namespace.NamespaceDriver;
 import org.apache.distributedlog.util.ConfUtils;
 import org.apache.distributedlog.util.Utils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
 /**
  * Utility class for setting up bookkeeper ensembles
  * and bringing individual bookies up and down.
@@ -100,9 +96,11 @@ public static DistributedLogManager createNewDLM(String name,
         return namespace.openLog(name);
     }
 
-    static MetadataAccessor createNewMetadataAccessor(DistributedLogConfiguration conf,
-                                                      String name,
-                                                      URI uri) throws Exception {
+    @SuppressWarnings("deprecation")
+    static org.apache.distributedlog.api.MetadataAccessor createNewMetadataAccessor(
+            DistributedLogConfiguration conf,
+            String name,
+            URI uri) throws Exception {
         // TODO: Metadata Accessor seems to be a legacy object which only used by kestrel
         //       (we might consider deprecating this)
         Namespace namespace = NamespaceBuilder.newBuilder()
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index c44df2f07..6a703136e 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -43,7 +43,6 @@
 import org.apache.distributedlog.api.DistributedLogManager;
 import org.apache.distributedlog.api.LogReader;
 import org.apache.distributedlog.api.LogWriter;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.api.subscription.SubscriptionsStore;
@@ -492,10 +491,12 @@ public void testCheckLogExists() throws Exception {
         namespace.close();
     }
 
+    @SuppressWarnings("deprecation")
     @Test(timeout = 60000)
     public void testMetadataAccessor() throws Exception {
         String name = "distrlog-metadata-accessor";
-        MetadataAccessor metadata = DLMTestUtil.createNewMetadataAccessor(conf, name, createDLMURI("/" + name));
+        org.apache.distributedlog.api.MetadataAccessor metadata =
+            DLMTestUtil.createNewMetadataAccessor(conf, name, createDLMURI("/" + name));
         assertEquals(name, metadata.getStreamName());
         metadata.createOrUpdateMetadata(name.getBytes());
         assertEquals(name, new String(metadata.getMetadata()));
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
index c0eecdf77..e2a970598 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
@@ -105,6 +105,7 @@ public void getEnsemblePlacementResolverClass() throws Exception {
         assertEquals(TestDNSResolver.class, conf3.getEnsemblePlacementDnsResolverClass());
     }
 
+    @SuppressWarnings("deprecation")
     @Test(timeout = 200000)
     public void validateConfiguration(){
         boolean exceptionThrown = false;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index 4aa832a08..baeaab921 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -25,10 +25,9 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Matchers.anyString;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -48,7 +47,6 @@
 import org.apache.distributedlog.TestZooKeeperClientBuilder;
 import org.apache.distributedlog.ZooKeeperClient;
 import org.apache.distributedlog.ZooKeeperClusterTestCase;
-import org.apache.distributedlog.api.MetadataAccessor;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
 import org.apache.distributedlog.common.concurrent.FutureUtils;
@@ -396,6 +394,7 @@ public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Except
         Utils.ioResult(getLog(uri, logName, logIdentifier, zkc, true, false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test(timeout = 60000)
     public void testCreateLogMetadataWithCustomMetadata() throws Exception {
         String logName = testName.getMethodName();
@@ -409,7 +408,8 @@ public void testCreateLogMetadataWithCustomMetadata() throws Exception {
             .uri(uri)
             .build();
 
-        MetadataAccessor accessor = namespace.getNamespaceDriver().getMetadataAccessor(logName);
+        org.apache.distributedlog.api.MetadataAccessor accessor =
+            namespace.getNamespaceDriver().getMetadataAccessor(logName);
         accessor.createOrUpdateMetadata(logName.getBytes("UTF-8"));
         accessor.close();
 
@@ -438,7 +438,7 @@ public void testGetLogSegmentsZKExceptions() throws Exception {
             Children2Callback callback = (Children2Callback) invocationOnMock.getArguments()[2];
             callback.processResult(Code.BADVERSION.intValue(), path, null, null, null);
             return null;
-        }).when(mockZk).getChildren(anyString(), anyBoolean(), any(Children2Callback.class), anyObject());
+        }).when(mockZk).getChildren(anyString(), anyBoolean(), any(Children2Callback.class), any());
 
         String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier);
         try {
@@ -508,7 +508,7 @@ public void testGetMissingPathsFailure() throws Exception {
             StatCallback callback = (StatCallback) invocationOnMock.getArguments()[2];
             callback.processResult(Code.BADVERSION.intValue(), path, null, null);
             return null;
-        }).when(mockZk).exists(anyString(), anyBoolean(), any(StatCallback.class), anyObject());
+        }).when(mockZk).exists(anyString(), anyBoolean(), any(StatCallback.class), any());
 
         try {
             FutureUtils.result(getMissingPaths(mockZkc, uri, "path/to/log"));
diff --git a/stream/distributedlog/io/dlfs/pom.xml b/stream/distributedlog/io/dlfs/pom.xml
index 775f07be1..b6df19bc2 100644
--- a/stream/distributedlog/io/dlfs/pom.xml
+++ b/stream/distributedlog/io/dlfs/pom.xml
@@ -21,24 +21,18 @@
   <parent>
     <artifactId>distributedlog</artifactId>
     <groupId>org.apache.distributedlog</groupId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>4.7.0-SNAPSHOT</version>
     <relativePath>../..</relativePath>
   </parent>
   <groupId>org.apache.distributedlog</groupId>
   <artifactId>dlfs</artifactId>
-  <name>Apache DistributedLog :: IO :: FileSystem</name>
+  <name>Apache BookKeeper :: DistributedLog :: IO :: FileSystem</name>
   <url>http://maven.apache.org</url>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.libdir>${basedir}/lib</project.libdir>
   </properties>
   <dependencies>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <version>${lombok.version}</version>
-      <scope>provided</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
@@ -59,12 +53,6 @@
         </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-core</artifactId>
@@ -76,8 +64,8 @@
   <build>
     <plugins>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -90,13 +78,13 @@
             <version>${puppycrawl.checkstyle.version}</version>
           </dependency>
           <dependency>
-            <groupId>org.apache.distributedlog</groupId>
-            <artifactId>distributedlog-build-tools</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>buildtools</artifactId>
+            <version>${project.parent.version}</version>
           </dependency>
         </dependencies>
         <configuration>
-          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <configLocation>bookkeeper/checkstyle.xml</configLocation>
           <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
           <consoleOutput>true</consoleOutput>
           <failOnViolation>true</failOnViolation>
@@ -105,7 +93,7 @@
         </configuration>
         <executions>
           <execution>
-            <phase>test-compile</phase>
+            <phase>validate</phase>
             <goals>
               <goal>check</goal>
             </goals>
diff --git a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
index 0670a4a31..1a056c3c9 100644
--- a/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
+++ b/stream/distributedlog/io/dlfs/src/main/java/org/apache/distributedlog/fs/DLFileSystem.java
@@ -265,7 +265,7 @@ public boolean delete(Path path, boolean recursive) throws IOException {
                 Path childPath = new Path(path, child);
                 statusList.add(getFileStatus(childPath));
             }
-            Collections.sort(statusList, Comparator.comparing(FileStatus::getPath));
+            Collections.sort(statusList, Comparator.comparing(fileStatus -> fileStatus.getPath().getName()));
             return statusList.toArray(new FileStatus[statusList.size()]);
         } catch (LogNotFoundException e) {
             throw new FileNotFoundException(path.toString());
diff --git a/stream/distributedlog/io/pom.xml b/stream/distributedlog/io/pom.xml
index be82c405a..1898b3f8b 100644
--- a/stream/distributedlog/io/pom.xml
+++ b/stream/distributedlog/io/pom.xml
@@ -22,12 +22,12 @@
   <parent>
     <groupId>org.apache.distributedlog</groupId>
     <artifactId>distributedlog</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>4.7.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <artifactId>distributedlog-io</artifactId>
   <packaging>pom</packaging>
-  <name>Apache DistributedLog :: IO</name>
+  <name>Apache BookKeeper :: DistributedLog :: IO</name>
   <modules>
     <module>dlfs</module>
   </modules>
diff --git a/stream/distributedlog/pom.xml b/stream/distributedlog/pom.xml
new file mode 100644
index 000000000..545ebaa0f
--- /dev/null
+++ b/stream/distributedlog/pom.xml
@@ -0,0 +1,242 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper</groupId>
+    <artifactId>bookkeeper</artifactId>
+    <version>4.7.0-SNAPSHOT</version>
+    <relativePath>../..</relativePath>
+  </parent>
+  <groupId>org.apache.distributedlog</groupId>
+  <artifactId>distributedlog</artifactId>
+  <packaging>pom</packaging>
+  <name>Apache BookKeeper :: DistributedLog :: Parent</name>
+  <description>
+    Apache DistributedLog provides a high performance replicated log service.
+  </description>
+  <inceptionYear>2016</inceptionYear>
+  <modules>
+    <module>common</module>
+    <module>protocol</module>
+    <module>core</module>
+    <module>io</module>
+  </modules>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+    <!-- dependencies -->
+    <codahale.metrics.version>3.0.1</codahale.metrics.version>
+    <commons-cli.version>1.1</commons-cli.version>
+    <commons-codec.version>1.6</commons-codec.version>
+    <commons-lang.version>2.6</commons-lang.version>
+    <commons-lang3.version>3.3.2</commons-lang3.version>
+    <curator.version>4.0.0</curator.version>
+    <finagle.version>6.34.0</finagle.version>
+    <freebuilder.version>1.12.3</freebuilder.version>
+    <guava.version>20.0</guava.version>
+    <jetty.version>9.3.11.v20160721</jetty.version>
+    <jmh.version>1.19</jmh.version>
+    <jmock.version>2.8.2</jmock.version>
+    <junit.version>4.8.1</junit.version>
+    <libthrift.version>0.5.0-1</libthrift.version>
+    <lz4.version>1.3.0</lz4.version>
+    <netty.version>4.1.12.Final</netty.version>
+    <scrooge.version>4.6.0</scrooge.version>
+    <slf4j.version>1.6.4</slf4j.version>
+    <prometheus.version>0.0.21</prometheus.version>
+    <stats-util.version>0.0.58</stats-util.version>
+    <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <!-- plugin dependencies -->
+    <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
+    <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
+    <coveralls-maven-plugin.version>4.1.0</coveralls-maven-plugin.version>
+    <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
+    <maven-assembly-plugin.version>2.2.1</maven-assembly-plugin.version>
+    <maven-checkstyle-plugin.version>2.17</maven-checkstyle-plugin.version>
+    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
+    <maven-deploy-plugin.version>2.7</maven-deploy-plugin.version>
+    <maven-jar-plugin.version>2.2</maven-jar-plugin.version>
+    <maven-javadoc-plugin.version>2.8</maven-javadoc-plugin.version>
+    <maven-shade-plugin.version>2.4.3</maven-shade-plugin.version>
+    <maven-source-plugin.version>2.2.1</maven-source-plugin.version>
+    <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
+    <scrooge-maven-plugin.version>3.17.0</scrooge-maven-plugin.version>
+  </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <version>${maven-javadoc-plugin.version}</version>
+        <configuration>
+          <!-- Avoid for missing javadoc comments to be marked as errors -->
+          <additionalparam>-Xdoclint:none -notimestamp</additionalparam>
+          <groups>
+            <group>
+              <title>Core Library</title>
+              <packages>org.apache.distributedlog:org.apache.distributedlog.annotations:org.apache.distributedlog.callback:org.apache.distributedlog.exceptions:org.apache.distributedlog.feature:org.apache.distributedlog.io:org.apache.distributedlog.lock:org.apache.distributedlog.logsegment:org.apache.distributedlog.metadata:org.apache.distributedlog.namespace:org.apache.distributedlog.net:org.apache.distributedlog.stats:org.apache.distributedlog.api.subscription</packages>
+            </group>
+          </groups>
+          <excludePackageNames>
+            org.apache.distributedlog.acl:org.apache.distributedlog.admin:org.apache.distributedlog.auditor:org.apache.distributedlog.basic:org.apache.distributedlog.benchmark*:org.apache.distributedlog.bk:org.apache.distributedlog.ownership:org.apache.distributedlog.proxy:org.apache.distributedlog.resolver:org.apache.distributedlog.service.*:org.apache.distributedlog.config:org.apache.distributedlog.function:org.apache.distributedlog.impl*:org.apache.distributedlog.injector:org.apache.distributedlog.kafka:org.apache.distributedlog.limiter:org.apache.distributedlog.mapreduce:org.apache.distributedlog.messaging:org.apache.distributedlog.common.rate:org.apache.distributedlog.readahead:org.apache.distributedlog.selector:org.apache.distributedlog.stats:org.apache.distributedlog.thrift*:org.apache.distributedlog.tools:org.apache.distributedlog.util:org.apache.distributedlog.zk:org.apache.bookkeeper.client:org.apache.bookkeeper.stats
+          </excludePackageNames>
+        </configuration>
+        <executions>
+          <execution>
+            <id>aggregate</id>
+            <goals>
+              <goal>aggregate</goal>
+            </goals>
+            <phase>site</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>${maven-assembly-plugin.version}</version>
+        <configuration>
+          <tarLongFileMode>gnu</tarLongFileMode>
+          <descriptors>
+            <descriptor>src/assemble/src.xml</descriptor>
+          </descriptors>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
+        <version>${spotbugs-maven-plugin.version}</version>
+        <configuration>
+          <excludeFilterFile>${session.executionRootDirectory}/buildtools/src/main/resources/distributedlog/findbugsExclude.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>${maven-source-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <!-- only run tests when -DintegrationTests is specified //-->
+          <skipTests>true</skipTests>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G -Dio.netty.leakDetection.level=PARANOID</argLine>
+          <forkMode>always</forkMode>
+          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <version>${apache-rat-plugin.version}</version>
+        <configuration>
+          <excludes>
+            <exclude>ChangeLog</exclude>
+            <exclude>CONFIG.ini</exclude>
+            <exclude>GROUPS</exclude>
+            <exclude>OWNERS</exclude>
+            <exclude>dist/**/*</exclude>
+            <exclude>docs/**/*</exclude>
+            <exclude>scripts/dev/reviewers</exclude>
+            <exclude>website/**/*</exclude>
+            <exclude>**/*.md</exclude>
+            <exclude>**/apidocs/*</exclude>
+            <exclude>**/dependency-reduced-pom.xml</exclude>
+            <exclude>**/org/apache/distributedlog/thrift/*</exclude>
+            <exclude>**/logs/*.log</exclude>
+            <exclude>**/target/**/*</exclude>
+            <!-- Git -->
+            <exclude>.git/**/*</exclude>
+            <exclude>.github/**/*</exclude>
+            <exclude>.gitignore</exclude>
+            <exclude>docker/.gitignore</exclude>
+            <exclude>.idea/**/*</exclude>
+            <!-- Intellij -->
+            <exclude>**/*.iml</exclude>
+            <exclude>**/*.iws</exclude>
+            <exclude>**/*.ipr</exclude>
+            <!-- SVN -->
+            <exclude>**/.svn/**/*</exclude>
+            <!-- Maven -->
+            <exclude>.repository/**</exclude>
+            <!-- Grafana -->
+            <exclude>docker/grafana/dashboards/*.json</exclude>
+          </excludes>
+          <consoleOutput>true</consoleOutput>
+        </configuration>
+      </plugin>
+      <!-- Report jacoco coverage to coveralls.io -->
+      <plugin>
+        <groupId>org.eluder.coveralls</groupId>
+        <artifactId>coveralls-maven-plugin</artifactId>
+        <version>${coveralls-maven-plugin.version}</version>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>cobertura-maven-plugin</artifactId>
+        <version>${cobertura-maven-plugin.version}</version>
+        <configuration>
+          <formats>
+            <format>html</format>
+            <format>xml</format>
+          </formats>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>distributedlog</id>
+      <activation>
+        <property>
+          <name>distributedlog</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
diff --git a/stream/distributedlog/protocol/pom.xml b/stream/distributedlog/protocol/pom.xml
index bb355c698..9d3f7702f 100644
--- a/stream/distributedlog/protocol/pom.xml
+++ b/stream/distributedlog/protocol/pom.xml
@@ -20,45 +20,21 @@
   <parent>
     <groupId>org.apache.distributedlog</groupId>
     <artifactId>distributedlog</artifactId>
-    <version>0.6.0-SNAPSHOT</version>
+    <version>4.7.0-SNAPSHOT</version>
   </parent>
   <artifactId>distributedlog-protocol</artifactId>
-  <name>Apache DistributedLog :: Protocol</name>
+  <name>Apache BookKeeper :: DistributedLog :: Protocol</name>
   <dependencies>
     <dependency>
       <groupId>org.apache.distributedlog</groupId>
       <artifactId>distributedlog-common</artifactId>
       <version>${project.version}</version>
     </dependency>
-    <dependency>
-      <groupId>org.projectlombok</groupId>
-      <artifactId>lombok</artifactId>
-      <version>${lombok.version}</version>
-      <scope>provided</scope>
-    </dependency>
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
       <version>${netty.version}</version>
     </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
-      <version>${mockito.version}</version>
-      <scope>test</scope>
-    </dependency> 
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
   <build>
     <plugins>
@@ -75,8 +51,8 @@
         </executions>
       </plugin>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>findbugs-maven-plugin</artifactId>
+        <groupId>com.github.spotbugs</groupId>
+        <artifactId>spotbugs-maven-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -89,13 +65,13 @@
             <version>${puppycrawl.checkstyle.version}</version>
           </dependency>
           <dependency>
-            <groupId>org.apache.distributedlog</groupId>
-            <artifactId>distributedlog-build-tools</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.bookkeeper</groupId>
+            <artifactId>buildtools</artifactId>
+            <version>${project.parent.version}</version>
           </dependency>
         </dependencies>
         <configuration>
-          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <configLocation>bookkeeper/checkstyle.xml</configLocation>
           <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
           <consoleOutput>true</consoleOutput>
           <failOnViolation>true</failOnViolation>
diff --git a/tests/backward-compat/current-server-old-clients/pom.xml b/tests/backward-compat/current-server-old-clients/pom.xml
new file mode 100644
index 000000000..618cbb80c
--- /dev/null
+++ b/tests/backward-compat/current-server-old-clients/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="
+  http://maven.apache.org/POM/4.0.0
+  http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.bookkeeper.tests.backward-compat</groupId>
+    <artifactId>tests-parent</artifactId>
+    <version>4.7.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.bookkeeper.tests.backward-compat</groupId>
+  <artifactId>backward-compat-current-server-old-clients</artifactId>
+  <packaging>jar</packaging>
+  <name>Apache BookKeeper :: Tests :: Backward Compatibility :: Test old clients working on current server</name>
+
+</project>
diff --git a/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy b/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy
new file mode 100644
index 000000000..9c237ccab
--- /dev/null
+++ b/tests/backward-compat/current-server-old-clients/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatOldClients.groovy
@@ -0,0 +1,174 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.tests.backwardcompat
+
+import com.github.dockerjava.api.DockerClient
+
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
+import org.jboss.arquillian.junit.Arquillian
+import org.jboss.arquillian.test.api.ArquillianResource
+
+import org.junit.Assert
+import org.junit.Before
+import org.junit.Test
+import org.junit.runner.RunWith
+
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+@RunWith(Arquillian.class)
+class TestCompatOldClients {
+    private static final Logger LOG = LoggerFactory.getLogger(TestCompatOldClients.class)
+    private static byte[] PASSWD = "foobar".getBytes()
+
+    // 4.1.0 doesn't work because metadata format changed
+    private def oldClientVersions = ["4.2.0", "4.2.1", "4.2.2", "4.2.3", "4.2.4",
+                                     "4.3.0", "4.3.1", "4.3.2", "4.4.0", "4.5.0", "4.5.1",
+                                     "4.6.0", "4.6.1"]
+
+    @ArquillianResource
+    DockerClient docker
+
+    private String currentVersion = System.getProperty("currentVersion")
+
+    @Before
+    public void before() throws Exception {
+        // First test to run, formats metadata and bookies
+        if (BookKeeperClusterUtils.metadataFormatIfNeeded(docker, currentVersion)) {
+            BookKeeperClusterUtils.formatAllBookies(docker, currentVersion)
+        }
+        // If already started, this has no effect
+        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+    }
+
+    private void testFencingOldClient(String oldClientVersion, String fencingVersion) {
+        String zookeeper = BookKeeperClusterUtils.zookeeperConnectString(docker)
+
+        def oldCL = MavenClassLoader.forBookKeeperVersion(oldClientVersion)
+        def oldBK = oldCL.newBookKeeper(zookeeper)
+        def fencingCL = MavenClassLoader.forBookKeeperVersion(fencingVersion)
+        def fencingBK = fencingCL.newBookKeeper(zookeeper)
+
+        try {
+            def numEntries = 5
+            def ledger0 = oldBK.createLedger(3, 2,
+                                             oldCL.digestType("CRC32"),
+                                             PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+
+            def ledger1 = fencingBK.openLedger(ledger0.getId(), fencingCL.digestType("CRC32"), PASSWD)
+
+            // cannot write any more
+            try {
+                ledger0.addEntry("shouldn't work".getBytes())
+                Assert.fail("Shouldn't have been able to add any more")
+            } catch (Exception e) {
+                Assert.assertEquals(e.getClass().getName(),
+                                    "org.apache.bookkeeper.client.BKException\$BKLedgerClosedException")
+            }
+
+            // should be able to open it and read it back
+            def ledger2 = oldBK.openLedger(ledger0.getId(), oldCL.digestType("CRC32"), PASSWD)
+            def entries = ledger2.readEntries(0, ledger2.getLastAddConfirmed())
+            Assert.assertEquals(numEntries, ledger2.getLastAddConfirmed() + 1 /* counts from 0 */)
+            int j = 0
+            while (entries.hasMoreElements()) {
+                def e = entries.nextElement()
+                Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
+                j++
+            }
+            ledger2.close()
+        } finally {
+            oldBK.close()
+            oldCL.close()
+            fencingCL.close()
+            fencingCL.close()
+        }
+    }
+
+    @Test
+    public void testNewClientFencesOldClient() throws Exception {
+        oldClientVersions.each{
+            testFencingOldClient(it, currentVersion)
+        }
+    }
+
+    @Test
+    public void testOldClientFencesOldClient() throws Exception {
+        oldClientVersions.each{
+            testFencingOldClient(it, it)
+        }
+    }
+
+    private void testReads(String writeVersion, String readerVersion) throws Exception {
+        String zookeeper = BookKeeperClusterUtils.zookeeperConnectString(docker)
+
+        def writeCL = MavenClassLoader.forBookKeeperVersion(writeVersion)
+        def writeBK = writeCL.newBookKeeper(zookeeper)
+        def readCL = MavenClassLoader.forBookKeeperVersion(readerVersion)
+        def readBK = readCL.newBookKeeper(zookeeper)
+        try {
+            def numEntries = 5
+            def ledger0 = writeBK.createLedger(3, 2,
+                                               writeCL.digestType("CRC32"),
+                                               PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+
+            def ledger1 = readBK.openLedger(ledger0.getId(), readCL.digestType("CRC32"), PASSWD)
+
+            def entries = ledger1.readEntries(0, ledger1.getLastAddConfirmed())
+            Assert.assertEquals(numEntries, ledger1.getLastAddConfirmed() + 1 /* counts from 0 */)
+            int j = 0
+            while (entries.hasMoreElements()) {
+                def e = entries.nextElement()
+                Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
+                j++
+            }
+            ledger1.close()
+        } finally {
+            readBK.close()
+            readCL.close()
+            writeBK.close()
+            writeCL.close()
+        }
+    }
+
+    @Test
+    public void testOldClientReadsNewClient() throws Exception {
+        oldClientVersions.each{
+            testReads(currentVersion, it)
+        }
+    }
+
+    @Test
+    public void testNewClientReadsNewClient() throws Exception {
+        oldClientVersions.each{
+            testReads(it, currentVersion)
+        }
+    }
+}
diff --git a/tests/backward-compat/current-server-old-clients/src/test/resources/arquillian.xml b/tests/backward-compat/current-server-old-clients/src/test/resources/arquillian.xml
new file mode 100644
index 000000000..f914ff2c2
--- /dev/null
+++ b/tests/backward-compat/current-server-old-clients/src/test/resources/arquillian.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<arquillian xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+            xmlns="http://jboss.org/schema/arquillian"
+            xsi:schemaLocation="http://jboss.org/schema/arquillian
+                                http://jboss.org/schema/arquillian/arquillian_1_0.xsd">
+
+  <extension qualifier="docker">
+    <property name="definitionFormat">CUBE</property>
+    <property name="dockerContainersResource">cube-definitions/3-node-all-version-unstarted.yaml</property>
+  </extension>
+
+</arquillian>
diff --git a/tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/TestCompatHierarchicalLedgerManager.groovy b/tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatHierarchicalLedgerManager.groovy
similarity index 62%
rename from tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/TestCompatHierarchicalLedgerManager.groovy
rename to tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatHierarchicalLedgerManager.groovy
index e7178a7c1..f37614485 100644
--- a/tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/TestCompatHierarchicalLedgerManager.groovy
+++ b/tests/backward-compat/hierarchical-ledger-manager/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatHierarchicalLedgerManager.groovy
@@ -15,10 +15,13 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
 
@@ -55,38 +58,42 @@ class TestCompatHierarchicalLedgerManager {
         Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, "4.2.0"))
 
         String zookeeper = BookKeeperClusterUtils.zookeeperConnectString(docker)
-        int numEntries = 10
+
         def v420CL = MavenClassLoader.forBookKeeperVersion("4.2.0")
         def v420BK = v420CL.newBookKeeper(zookeeper)
+        def currentCL = MavenClassLoader.forBookKeeperVersion(currentVersion)
+        def currentBK = currentCL.newBookKeeper(zookeeper)
+        try {
+            int numEntries = 10
 
-        def ledger0 = v420BK.createLedger(3, 2, v420CL.digestType("CRC32"), PASSWD)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry(("foobar" + i).getBytes())
-        }
-        ledger0.close()
+            def ledger0 = v420BK.createLedger(3, 2, v420CL.digestType("CRC32"), PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
 
-        Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
+            Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
 
-        BookKeeperClusterUtils.updateAllBookieConf(docker, currentVersion,
-                                                   "ledgerManagerFactoryClass",
-                                                   "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory")
-        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+            BookKeeperClusterUtils.updateAllBookieConf(docker, currentVersion,
+                                                       "ledgerManagerFactoryClass",
+                                                       "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory")
+            Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
 
-        def currentCL = MavenClassLoader.forBookKeeperVersion(currentVersion)
-        def currentBK = currentCL.newBookKeeper(zookeeper)
-
-        def ledger1 = currentBK.openLedger(ledger0.getId(), currentCL.digestType("CRC32"), PASSWD)
-        Assert.assertEquals(numEntries, ledger1.getLastAddConfirmed() + 1 /* counts from 0 */)
-        def entries = ledger1.readEntries(0, ledger1.getLastAddConfirmed())
-        int j = 0
-        while (entries.hasMoreElements()) {
-            def e = entries.nextElement()
-            Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
-            j++
+            def ledger1 = currentBK.openLedger(ledger0.getId(), currentCL.digestType("CRC32"), PASSWD)
+            Assert.assertEquals(numEntries, ledger1.getLastAddConfirmed() + 1 /* counts from 0 */)
+            def entries = ledger1.readEntries(0, ledger1.getLastAddConfirmed())
+            int j = 0
+            while (entries.hasMoreElements()) {
+                def e = entries.nextElement()
+                Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
+                j++
+            }
+            ledger1.close()
+        } finally {
+            currentBK.close()
+            currentCL.close()
+            v420BK.close()
+            v420CL.close()
         }
-        ledger1.close()
-
-        v420BK.close()
-        currentBK.close()
     }
 }
diff --git a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeWithHostnameBookieId.groovy b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
similarity index 56%
rename from tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeWithHostnameBookieId.groovy
rename to tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
index 67100def3..134d05749 100644
--- a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeWithHostnameBookieId.groovy
+++ b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
@@ -15,10 +15,13 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
 
@@ -75,56 +78,65 @@ class TestCompatUpgradeWithHostnameBookieId {
 
         Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, "4.1.0"))
 
-        // Write a ledger with v4.1.0 client
         def v410CL = MavenClassLoader.forBookKeeperVersion("4.1.0")
         def v410BK = v410CL.newBookKeeper(zookeeper)
-
-        def ledger410 = v410BK.createLedger(3, 2, v410CL.digestType("CRC32"), PASSWD)
-        writeEntries(ledger410, numEntries)
-        ledger410.close()
-
-        // Write a ledger with v4.2.0 client
         def v420CL = MavenClassLoader.forBookKeeperVersion("4.2.0")
         def v420BK = v420CL.newBookKeeper(zookeeper)
-
-        def ledger420 = v420BK.createLedger(3, 2, v420CL.digestType("CRC32"), PASSWD)
-        writeEntries(ledger420, numEntries)
-        ledger420.close()
-
-        // Stop bookies, change config to use hostname as id, restart with latest version
-        Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
-        BookKeeperClusterUtils.updateAllBookieConf(docker, currentVersion, "useHostNameAsBookieID", "true")
-        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
-
-        // Ensure we can read ledger with v4.1.0 client
-        def ledger410r = v410BK.openLedger(ledger410.getId(), v410CL.digestType("CRC32"), PASSWD)
-        assertHasEntries(ledger410r, numEntries)
-        ledger410r.close()
-
-        // Ensure we can read ledger with v4.2.0 client
-        def ledger420r = v420BK.openLedger(ledger420.getId(), v420CL.digestType("CRC32"), PASSWD)
-        assertHasEntries(ledger420r, numEntries)
-        ledger420r.close()
-
-        // Ensure we can write and read new ledgers with all client versions
         def currentCL = MavenClassLoader.forBookKeeperVersion(currentVersion)
         def currentBK = currentCL.newBookKeeper(zookeeper)
-        oldClientVersions.each{
-            LOG.info("Testing ledger creation for version {}", it)
-            def oldCL = MavenClassLoader.forBookKeeperVersion(it)
-            def oldBK = oldCL.newBookKeeper(zookeeper)
-
-            def ledger0 = oldBK.createLedger(3, 2, oldCL.digestType("CRC32"), PASSWD)
-            writeEntries(ledger0, numEntries)
-            ledger0.close()
-
-            def ledger1 = currentBK.openLedger(ledger0.getId(), currentCL.digestType("CRC32"), PASSWD)
-            assertHasEntries(ledger1, numEntries)
-            ledger1.close()
 
-            oldBK.close()
+        try {
+            // Write a ledger with v4.1.0 client
+            def ledger410 = v410BK.createLedger(3, 2, v410CL.digestType("CRC32"), PASSWD)
+            writeEntries(ledger410, numEntries)
+            ledger410.close()
+
+            // Write a ledger with v4.2.0 client
+            def ledger420 = v420BK.createLedger(3, 2, v420CL.digestType("CRC32"), PASSWD)
+            writeEntries(ledger420, numEntries)
+            ledger420.close()
+
+            // Stop bookies, change config to use hostname as id, restart with latest version
+            Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
+            BookKeeperClusterUtils.updateAllBookieConf(docker, currentVersion, "useHostNameAsBookieID", "true")
+            Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+
+            // Ensure we can read ledger with v4.1.0 client
+            def ledger410r = v410BK.openLedger(ledger410.getId(), v410CL.digestType("CRC32"), PASSWD)
+            assertHasEntries(ledger410r, numEntries)
+            ledger410r.close()
+
+            // Ensure we can read ledger with v4.2.0 client
+            def ledger420r = v420BK.openLedger(ledger420.getId(), v420CL.digestType("CRC32"), PASSWD)
+            assertHasEntries(ledger420r, numEntries)
+            ledger420r.close()
+
+            // Ensure we can write and read new ledgers with all client versions
+            oldClientVersions.each{
+                LOG.info("Testing ledger creation for version {}", it)
+                def oldCL = MavenClassLoader.forBookKeeperVersion(it)
+                def oldBK = oldCL.newBookKeeper(zookeeper)
+                try {
+                    def ledger0 = oldBK.createLedger(3, 2, oldCL.digestType("CRC32"), PASSWD)
+                    writeEntries(ledger0, numEntries)
+                    ledger0.close()
+
+                    def ledger1 = currentBK.openLedger(ledger0.getId(), currentCL.digestType("CRC32"), PASSWD)
+                    assertHasEntries(ledger1, numEntries)
+                    ledger1.close()
+                } finally {
+                    oldBK.close()
+                    oldCL.close()
+                }
+            }
+        } finally {
+            currentBK.close()
+            currentCL.close()
+            v420BK.close()
+            v420CL.close()
+            v410BK.close()
+            v410CL.close()
         }
-        currentBK.close()
     }
 
 
diff --git a/tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeOldServerInClusterWithCookies.groovy b/tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeOldServerInClusterWithCookies.groovy
similarity index 53%
rename from tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeOldServerInClusterWithCookies.groovy
rename to tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeOldServerInClusterWithCookies.groovy
index 45cdfd1ea..c2d72638e 100644
--- a/tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeOldServerInClusterWithCookies.groovy
+++ b/tests/backward-compat/old-cookie-new-cluster/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeOldServerInClusterWithCookies.groovy
@@ -15,10 +15,13 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
 
@@ -48,36 +51,40 @@ class TestCompatUpgradeOldServerInClusterWithCookies {
         Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, "4.1.0"))
         def v410CL = MavenClassLoader.forBookKeeperVersion("4.1.0")
         def v410BK = v410CL.newBookKeeper(zookeeper)
-
-        def ledger0 = v410BK.createLedger(3, 2, v410CL.digestType("CRC32"), PASSWD)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry(("foobar" + i).getBytes())
-        }
-        ledger0.close()
-
-        Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
-
-        // format metadata
-        String bookieScript = "/opt/bookkeeper/" + currentVersion + "/bin/bookkeeper"
-        Assert.assertTrue(
-            BookKeeperClusterUtils.runOnAnyBookie(docker, bookieScript,
-                                                  "shell", "metaformat", "-nonInteractive", "-force"))
-
-        // bookies shouldn't come up because the cookie doesn't have instance id
-        Assert.assertFalse(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
-
-        // format bookie
-        BookKeeperClusterUtils.runOnAllBookies(docker, bookieScript,
-                                               "shell", "bookieformat", "-nonInteractive", "-force")
-
-        // bookies should come up
-        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
-
-        // but data has been lost of course, we formatted everything
         try {
-            v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
-        } catch (Exception e) {
-            // correct behaviour
+            def ledger0 = v410BK.createLedger(3, 2, v410CL.digestType("CRC32"), PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+            Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
+
+            // format metadata
+            String bookieScript = "/opt/bookkeeper/" + currentVersion + "/bin/bookkeeper"
+            Assert.assertTrue(
+                BookKeeperClusterUtils.runOnAnyBookie(docker, bookieScript,
+                                                      "shell", "metaformat", "-nonInteractive", "-force"))
+
+            // bookies shouldn't come up because the cookie doesn't have instance id
+            Assert.assertFalse(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+
+            // format bookie
+            BookKeeperClusterUtils.runOnAllBookies(docker, bookieScript,
+                                                   "shell", "bookieformat", "-nonInteractive", "-force")
+
+            // bookies should come up
+            Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+
+            // but data has been lost of course, we formatted everything
+            try {
+                v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
+            } catch (Exception e) {
+                // correct behaviour
+            }
+        } finally {
+            v410BK.close()
+            v410CL.close()
         }
     }
 }
diff --git a/tests/backward-compat/pom.xml b/tests/backward-compat/pom.xml
index 11a9b45b0..a136ed234 100644
--- a/tests/backward-compat/pom.xml
+++ b/tests/backward-compat/pom.xml
@@ -35,5 +35,6 @@
     <module>hostname-bookieid</module>
     <module>recovery-no-password</module>
     <module>old-cookie-new-cluster</module>
+    <module>current-server-old-clients</module>
   </modules>
 </project>
diff --git a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/TestCompatRecoveryNoPassword.groovy b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
similarity index 58%
rename from tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/TestCompatRecoveryNoPassword.groovy
rename to tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
index e522d132f..d24a890b4 100644
--- a/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/TestCompatRecoveryNoPassword.groovy
+++ b/tests/backward-compat/recovery-no-password/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatRecoveryNoPassword.groovy
@@ -15,7 +15,7 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
@@ -35,6 +35,9 @@ import org.apache.bookkeeper.client.LedgerMetadata
 import org.apache.bookkeeper.conf.ClientConfiguration
 import org.apache.bookkeeper.net.BookieSocketAddress
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.DockerUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
 
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
@@ -154,95 +157,99 @@ class TestCompatRecoveryNoPassword {
 
         // Create a 4.1.0 client, will update /ledgers/LAYOUT
         def v410CL = MavenClassLoader.forBookKeeperVersion("4.1.0")
-        def v410Conf = v410CL.newInstance("org.apache.bookkeeper.conf.ClientConfiguration")
-        v410Conf.setZkServers(zookeeper).setLedgerManagerType("hierarchical")
-        def v410BK = v410CL.newInstance("org.apache.bookkeeper.client.BookKeeper", v410Conf)
-
-        // Start bookies
-        def bookieContainers = new ArrayList<>(DockerUtils.cubeIdsMatching("bookkeeper"))
-        Assert.assertTrue(bookieContainers.size() >= 3)
-        Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
-                docker, bookieContainers.get(0), currentVersion))
-        Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
-                docker, bookieContainers.get(1), currentVersion))
-
-        // recreate bk client so that it reads bookie list
-        v410BK.close()
-        v410BK = v410CL.newBookKeeper(zookeeper)
-
-        // Write a ledger
-        def ledger0 = v410BK.createLedger(2, 2,
-                                          v410CL.digestType("MAC"), passwdCorrect)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry("foobar".getBytes())
-        }
-        ledger0.close()
-        v410BK.close()
-
-        // start a new bookie, and kill one of the initial 2
-        def failedBookieId = new BookieSocketAddress(
-            DockerUtils.getContainerIP(docker, bookieContainers.get(0)), 3181)
-        Assert.assertTrue(BookKeeperClusterUtils.stopBookie(
-                docker, bookieContainers.get(0)))
-        Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
-                docker, bookieContainers.get(2), currentVersion))
-
-        def bkCur = new BookKeeper(zookeeper)
-
-        LedgerHandle lh = bkCur.openLedgerNoRecovery(
-            ledger0.getId(), BookKeeper.DigestType.MAC, passwdCorrect)
-        Assert.assertFalse("Should be entries missing",
-                           verifyFullyReplicated(bkCur, lh, numEntries))
-        lh.close()
-
-        ClientConfiguration adminConf = new ClientConfiguration()
-        adminConf.setZkServers(zookeeper)
-        adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.MAC)
-        adminConf.setBookieRecoveryPasswd(passwdBad)
-
-        def bka = new BookKeeperAdmin(adminConf)
-        try {
-            bka.recoverBookieData(failedBookieId)
-            Assert.fail("Shouldn't be able to recover with wrong password")
-        } catch (BKException bke) {
-            // correct behaviour
-        } finally {
-            bka.close();
-        }
-
-        adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.CRC32)
-        adminConf.setBookieRecoveryPasswd(passwdCorrect)
-
-        bka = new BookKeeperAdmin(adminConf)
         try {
-            bka.recoverBookieData(failedBookieId)
-            Assert.fail("Shouldn't be able to recover with wrong digest")
-        } catch (BKException bke) {
-            // correct behaviour
-        } finally {
-            bka.close();
-        }
+            def v410Conf = v410CL.newInstance("org.apache.bookkeeper.conf.ClientConfiguration")
+            v410Conf.setZkServers(zookeeper).setLedgerManagerType("hierarchical")
+            def v410BK = v410CL.newInstance("org.apache.bookkeeper.client.BookKeeper", v410Conf)
+
+            // Start bookies
+            def bookieContainers = new ArrayList<>(DockerUtils.cubeIdsMatching("bookkeeper"))
+            Assert.assertTrue(bookieContainers.size() >= 3)
+            Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
+                    docker, bookieContainers.get(0), currentVersion))
+            Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
+                    docker, bookieContainers.get(1), currentVersion))
+
+            // recreate bk client so that it reads bookie list
+            v410BK.close()
+            v410BK = v410CL.newBookKeeper(zookeeper)
+
+            // Write a ledger
+            def ledger0 = v410BK.createLedger(2, 2,
+                                              v410CL.digestType("MAC"), passwdCorrect)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry("foobar".getBytes())
+            }
+            ledger0.close()
+            v410BK.close()
+
+            // start a new bookie, and kill one of the initial 2
+            def failedBookieId = new BookieSocketAddress(
+                DockerUtils.getContainerIP(docker, bookieContainers.get(0)), 3181)
+            Assert.assertTrue(BookKeeperClusterUtils.stopBookie(
+                    docker, bookieContainers.get(0)))
+            Assert.assertTrue(BookKeeperClusterUtils.startBookieWithVersion(
+                    docker, bookieContainers.get(2), currentVersion))
+
+            def bkCur = new BookKeeper(zookeeper)
+            LedgerHandle lh = bkCur.openLedgerNoRecovery(
+                ledger0.getId(), BookKeeper.DigestType.MAC, passwdCorrect)
+            Assert.assertFalse("Should be entries missing",
+                               verifyFullyReplicated(bkCur, lh, numEntries))
+            lh.close()
+
+            ClientConfiguration adminConf = new ClientConfiguration()
+            adminConf.setZkServers(zookeeper)
+            adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.MAC)
+            adminConf.setBookieRecoveryPasswd(passwdBad)
+
+            def bka = new BookKeeperAdmin(adminConf)
+            try {
+                bka.recoverBookieData(failedBookieId)
+                Assert.fail("Shouldn't be able to recover with wrong password")
+            } catch (BKException bke) {
+                // correct behaviour
+            } finally {
+                bka.close();
+            }
 
-        // Check that entries are still missing
-        lh = bkCur.openLedgerNoRecovery(ledger0.getId(),
-                                        BookKeeper.DigestType.MAC, passwdCorrect)
-        Assert.assertFalse("Should be entries missing",
-                           verifyFullyReplicated(bkCur, lh, numEntries))
-        lh.close()
+            adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.CRC32)
+            adminConf.setBookieRecoveryPasswd(passwdCorrect)
+
+            bka = new BookKeeperAdmin(adminConf)
+            try {
+                bka.recoverBookieData(failedBookieId)
+                Assert.fail("Shouldn't be able to recover with wrong digest")
+            } catch (BKException bke) {
+                // correct behaviour
+            } finally {
+                bka.close();
+            }
 
+            // Check that entries are still missing
+            lh = bkCur.openLedgerNoRecovery(ledger0.getId(),
+                                            BookKeeper.DigestType.MAC, passwdCorrect)
+            Assert.assertFalse("Should be entries missing",
+                               verifyFullyReplicated(bkCur, lh, numEntries))
+            lh.close()
 
-        // Set correct password and mac, recovery will work
-        adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.MAC)
-        adminConf.setBookieRecoveryPasswd(passwdCorrect)
 
-        bka = new BookKeeperAdmin(adminConf)
-        bka.recoverBookieData(failedBookieId)
-        bka.close()
+            // Set correct password and mac, recovery will work
+            adminConf.setBookieRecoveryDigestType(BookKeeper.DigestType.MAC)
+            adminConf.setBookieRecoveryPasswd(passwdCorrect)
 
-        lh = bkCur.openLedgerNoRecovery(ledger0.getId(),
-                                        BookKeeper.DigestType.MAC, passwdCorrect)
-        Assert.assertTrue("Should have recovered everything",
-                          verifyFullyReplicated(bkCur, lh, numEntries))
-        lh.close()
+            bka = new BookKeeperAdmin(adminConf)
+            bka.recoverBookieData(failedBookieId)
+            bka.close()
+
+            lh = bkCur.openLedgerNoRecovery(ledger0.getId(),
+                                            BookKeeper.DigestType.MAC, passwdCorrect)
+            Assert.assertTrue("Should have recovered everything",
+                              verifyFullyReplicated(bkCur, lh, numEntries))
+            lh.close()
+            bkCur.close()
+        } finally {
+            v410CL.close()
+        }
     }
 }
diff --git a/tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeDirect.groovy b/tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeDirect.groovy
similarity index 51%
rename from tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeDirect.groovy
rename to tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeDirect.groovy
index 2f7fb67e4..2fe675e14 100644
--- a/tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgradeDirect.groovy
+++ b/tests/backward-compat/upgrade-direct/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeDirect.groovy
@@ -15,10 +15,13 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
 
@@ -49,44 +52,47 @@ class TestCompatUpgradeDirect {
         Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, "4.1.0"))
         def v410CL = MavenClassLoader.forBookKeeperVersion("4.1.0")
         def v410BK = v410CL.newBookKeeper(zookeeper)
-
-        def ledger0 = v410BK.createLedger(3, 2,
-                                          v410CL.digestType("CRC32"),
-                                          PASSWD)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry(("foobar" + i).getBytes())
-        }
-        ledger0.close()
-
-        // Current client shouldn't be able to write to 4.1.0 server
         def currentCL = MavenClassLoader.forBookKeeperVersion(currentVersion)
         def currentBK = currentCL.newBookKeeper(zookeeper)
-        def ledger1 = currentBK.createLedger(3, 2, currentCL.digestType("CRC32"), PASSWD)
         try {
-            ledger1.addEntry("foobar".getBytes())
-
-            Assert.fail("Shouldn't have been able to write")
-        } catch (Exception e) {
-            // correct behaviour
-        }
-
-        Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
-        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
-
-        // check that old client can read its old ledgers on new server
-        def ledger2 = v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
-        Assert.assertEquals(numEntries, ledger2.getLastAddConfirmed() + 1 /* counts from 0 */)
-        def entries = ledger2.readEntries(0, ledger2.getLastAddConfirmed())
-        int j = 0
-        while (entries.hasMoreElements()) {
-            def e = entries.nextElement()
-            Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
-            j++
+            def ledger0 = v410BK.createLedger(3, 2,
+                                              v410CL.digestType("CRC32"),
+                                              PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+            // Current client shouldn't be able to write to 4.1.0 server
+            def ledger1 = currentBK.createLedger(3, 2, currentCL.digestType("CRC32"), PASSWD)
+            try {
+                ledger1.addEntry("foobar".getBytes())
+
+                Assert.fail("Shouldn't have been able to write")
+            } catch (Exception e) {
+                // correct behaviour
+            }
+
+            Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
+            Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, currentVersion))
+
+            // check that old client can read its old ledgers on new server
+            def ledger2 = v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
+            Assert.assertEquals(numEntries, ledger2.getLastAddConfirmed() + 1 /* counts from 0 */)
+            def entries = ledger2.readEntries(0, ledger2.getLastAddConfirmed())
+            int j = 0
+            while (entries.hasMoreElements()) {
+                def e = entries.nextElement()
+                Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
+                j++
+            }
+            ledger2.close()
+        } finally {
+            currentBK.close()
+            currentCL.close()
+            v410BK.close()
+            v410CL.close()
         }
-        ledger2.close()
-
-        v410BK.close()
-        currentBK.close()
     }
 
     @Test
@@ -96,27 +102,30 @@ class TestCompatUpgradeDirect {
 
         def currentCL = MavenClassLoader.forBookKeeperVersion(currentVersion)
         def currentBK = currentCL.newBookKeeper(zookeeper)
-
-        def numEntries = 5
-        def ledger0 = currentBK.createLedger(3, 2,
-                                             currentCL.digestType("CRC32"),
-                                             PASSWD)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry(("foobar" + i).getBytes())
-        }
-        ledger0.close()
-
         def v410CL = MavenClassLoader.forBookKeeperVersion("4.1.0")
         def v410BK = v410CL.newBookKeeper(zookeeper)
 
         try {
-            def ledger1 = v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
-            Assert.fail("Shouldn't have been able to open")
-        } catch (Exception e) {
-            // correct behaviour
+            def numEntries = 5
+            def ledger0 = currentBK.createLedger(3, 2,
+                                                 currentCL.digestType("CRC32"),
+                                                 PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
+            }
+            ledger0.close()
+
+            try {
+                def ledger1 = v410BK.openLedger(ledger0.getId(), v410CL.digestType("CRC32"), PASSWD)
+                Assert.fail("Shouldn't have been able to open")
+            } catch (Exception e) {
+                // correct behaviour
+            }
+        } finally {
+            v410BK.close()
+            v410CL.close()
+            currentBK.close()
+            currentCL.close()
         }
-
-        currentBK.close()
-        v410BK.close()
     }
 }
diff --git a/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgrade.groovy b/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
similarity index 64%
rename from tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgrade.groovy
rename to tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
index 911afb8a1..7933c2f1e 100644
--- a/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/TestCompatUpgrade.groovy
+++ b/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
@@ -15,10 +15,13 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-package org.apache.bookkeeper.tests
+package org.apache.bookkeeper.tests.backwardcompat
 
 import com.github.dockerjava.api.DockerClient
 
+import org.apache.bookkeeper.tests.BookKeeperClusterUtils
+import org.apache.bookkeeper.tests.MavenClassLoader
+
 import org.jboss.arquillian.junit.Arquillian
 import org.jboss.arquillian.test.api.ArquillianResource
 
@@ -45,51 +48,54 @@ class TestCompatUpgrade {
         LOG.info("Upgrading from {} to {}", currentlyRunning, upgradeTo)
         int numEntries = 10
         def currentRunningCL = MavenClassLoader.forBookKeeperVersion(currentlyRunning)
-
         def currentRunningBK = currentRunningCL.newBookKeeper(zookeeper)
-
-        def ledger0 = currentRunningBK.createLedger(3, 2,
-                                                    currentRunningCL.digestType("CRC32"),
-                                                    PASSWD)
-        for (int i = 0; i < numEntries; i++) {
-            ledger0.addEntry(("foobar" + i).getBytes())
-        }
-        ledger0.close()
-
-        // Check whether current client can write to old server
         def upgradedCL = MavenClassLoader.forBookKeeperVersion(upgradeTo)
         def upgradedBK = upgradedCL.newBookKeeper(zookeeper)
-        def ledger1 = upgradedBK.createLedger(3, 2, upgradedCL.digestType("CRC32"), PASSWD)
-        try {
-            ledger1.addEntry("foobar".getBytes())
 
-            if (clientCompatBroken) {
-                Assert.fail("Shouldn't have been able to write")
+        try {
+            def ledger0 = currentRunningBK.createLedger(3, 2,
+                                                        currentRunningCL.digestType("CRC32"),
+                                                        PASSWD)
+            for (int i = 0; i < numEntries; i++) {
+                ledger0.addEntry(("foobar" + i).getBytes())
             }
-        } catch (Exception e) {
-            if (!clientCompatBroken) {
-                throw e;
+            ledger0.close()
+
+            // Check whether current client can write to old server
+            def ledger1 = upgradedBK.createLedger(3, 2, upgradedCL.digestType("CRC32"), PASSWD)
+            try {
+                ledger1.addEntry("foobar".getBytes())
+
+                if (clientCompatBroken) {
+                    Assert.fail("Shouldn't have been able to write")
+                }
+            } catch (Exception e) {
+                if (!clientCompatBroken) {
+                    throw e;
+                }
             }
-        }
 
-        Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
-        Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, upgradeTo))
-
-        // check that old client can read its old ledgers on new server
-        def ledger2 = currentRunningBK.openLedger(ledger0.getId(), currentRunningCL.digestType("CRC32"),
-                                                  PASSWD)
-        Assert.assertEquals(numEntries, ledger2.getLastAddConfirmed() + 1 /* counts from 0 */)
-        def entries = ledger2.readEntries(0, ledger2.getLastAddConfirmed())
-        int j = 0
-        while (entries.hasMoreElements()) {
-            def e = entries.nextElement()
-            Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
-            j++
+            Assert.assertTrue(BookKeeperClusterUtils.stopAllBookies(docker))
+            Assert.assertTrue(BookKeeperClusterUtils.startAllBookiesWithVersion(docker, upgradeTo))
+
+            // check that old client can read its old ledgers on new server
+            def ledger2 = currentRunningBK.openLedger(ledger0.getId(), currentRunningCL.digestType("CRC32"),
+                                                      PASSWD)
+            Assert.assertEquals(numEntries, ledger2.getLastAddConfirmed() + 1 /* counts from 0 */)
+            def entries = ledger2.readEntries(0, ledger2.getLastAddConfirmed())
+            int j = 0
+            while (entries.hasMoreElements()) {
+                def e = entries.nextElement()
+                Assert.assertEquals(new String(e.getEntry()), "foobar"+ j)
+                j++
+            }
+            ledger2.close()
+        } finally {
+            upgradedBK.close()
+            upgradedCL.close()
+            currentRunningBK.close()
+            currentRunningCL.close()
         }
-        ledger2.close()
-
-        currentRunningBK.close()
-        upgradedBK.close()
     }
 
     @Test
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
index 437e5b29f..a7f9f8c03 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
@@ -80,6 +80,23 @@ public static void legacyMetadataFormat(DockerClient docker) throws Exception {
         }
     }
 
+    public static boolean metadataFormatIfNeeded(DockerClient docker, String version) throws Exception {
+        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
+            if (zk.exists("/ledgers", false) == null) {
+                String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
+                runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", "-nonInteractive");
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    public static void formatAllBookies(DockerClient docker, String version) throws Exception {
+        String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
+        BookKeeperClusterUtils.runOnAllBookies(docker, bookkeeper, "shell", "bookieformat", "-nonInteractive");
+    }
+
     public static void updateBookieConf(DockerClient docker, String containerId,
                                         String version, String key, String value) throws Exception {
         String confFile = "/opt/bookkeeper/" + version + "/conf/bk_server.conf";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services