You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2018/08/17 20:16:17 UTC

[bookkeeper] branch branch-4.7 updated: Downgraded ZK version to 3.4.13

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.7 by this push:
     new 603bdbf  Downgraded ZK version to 3.4.13
603bdbf is described below

commit 603bdbfa47e0843f4129eece58d514d416f8fcbe
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Aug 17 13:04:22 2018 -0700

    Downgraded ZK version to 3.4.13
    
    BK 4.7 has shipped with newer ZooKeeper client 3.5.3-beta. In many cases, there has been concerns regarding having a dependency on a "beta" release. Irrespective of the seriousness of these concerns for the specific case of ZK which has been in alpha/beta for a very long time, we should not have dependency on versions of software for which the team itself has marked as "not yet stable".
    
    Adding to that, there is no clear roadmap or ETA for when final 3.5.x stable release will be available.
    
     * Downgraded ZK to latest stable version 3.4.13
     * Adjusted few usages of new client APIs that were introduced in 3.5.x
    
    Author: Matteo Merli <mm...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>
    
    This closes #1601 from merlimat/downgrade-zk
    
    (cherry picked from commit f5474b1380b73819ba9f85b42d006f9ad7e45aa5)
    Signed-off-by: Matteo Merli <mm...@apache.org>
---
 .../benchmark/BenchReadThroughputLatency.java      |  25 ++---
 bookkeeper-dist/src/assemble/bin-all.xml           |   1 +
 bookkeeper-dist/src/assemble/bin-server.xml        |   1 +
 .../src/main/resources/LICENSE-all.bin.txt         |  12 ++-
 .../src/main/resources/LICENSE-server.bin.txt      |  12 ++-
 .../src/main/resources/deps/jline-0.9.94/LICENSE   |  32 ++++++
 .../bookkeeper/discover/ZKRegistrationClient.java  |  17 +--
 .../bookkeeper/zookeeper/ZooKeeperClient.java      | 119 +--------------------
 .../bookkeeper/client/TestBookieWatcher.java       |  16 +--
 .../discover/TestZkRegistrationClient.java         |  28 +----
 .../replication/TestReplicationWorker.java         |   2 +-
 .../bookkeeper/zookeeper/TestZooKeeperClient.java  |  15 +--
 pom.xml                                            |   2 +-
 .../distributedlog/bk/SimpleLedgerAllocator.java   |  35 +++---
 .../apache/distributedlog/zk/ZKWatcherManager.java |  24 -----
 .../bookkeeper/tests/containers/ZKContainer.java   |   9 +-
 .../bookkeeper/tests/BookKeeperClusterUtils.java   |  53 ++++-----
 17 files changed, 137 insertions(+), 266 deletions(-)

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index 750984f..18c73e8 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -42,7 +43,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +55,7 @@ public class BenchReadThroughputLatency {
     private static final Pattern LEDGER_PATTERN = Pattern.compile("L([0-9]+)$");
 
     private static final Comparator<String> ZK_LEDGER_COMPARE = new Comparator<String>() {
+        @Override
         public int compare(String o1, String o2) {
             try {
                 Matcher m1 = LEDGER_PATTERN.matcher(o1);
@@ -186,28 +187,21 @@ public class BenchReadThroughputLatency {
         }
 
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        final CountDownLatch connectedLatch = new CountDownLatch(1);
         final String nodepath = String.format("/ledgers/L%010d", ledger.get());
 
         final ClientConfiguration conf = new ClientConfiguration();
         conf.setReadTimeout(sockTimeout).setZkServers(servers);
 
-        try (ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
-                public void process(WatchedEvent event) {
-                    if (event.getState() == Event.KeeperState.SyncConnected
-                            && event.getType() == Event.EventType.None) {
-                        connectedLatch.countDown();
-                    }
-                }
-        })) {
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
+                .connectString(servers)
+                .sessionTimeoutMs(3000)
+                .build()) {
             final Set<String> processedLedgers = new HashSet<String>();
             zk.register(new Watcher() {
+                    @Override
                     public void process(WatchedEvent event) {
                         try {
-                            if (event.getState() == Event.KeeperState.SyncConnected
-                                && event.getType() == Event.EventType.None) {
-                                connectedLatch.countDown();
-                            } else if (event.getType() == Event.EventType.NodeCreated
+                            if (event.getType() == Event.EventType.NodeCreated
                                        && event.getPath().equals(nodepath)) {
                                 readLedger(conf, ledger.get(), passwd);
                                 shutdownLatch.countDown();
@@ -233,6 +227,7 @@ public class BenchReadThroughputLatency {
                                             final Long ledgerId = Long.valueOf(m.group(1));
                                             processedLedgers.add(ledger);
                                             Thread t = new Thread() {
+                                                @Override
                                                 public void run() {
                                                     readLedger(conf, ledgerId, passwd);
                                                 }
@@ -254,7 +249,7 @@ public class BenchReadThroughputLatency {
                         }
                     }
                 });
-            connectedLatch.await();
+
             if (ledger.get() != 0) {
                 if (zk.exists(nodepath, true) != null) {
                     readLedger(conf, ledger.get(), passwd);
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml b/bookkeeper-dist/src/assemble/bin-all.xml
index 3a76012..c1b4f74 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -61,6 +61,7 @@
         <include>netty-4.1.22.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml b/bookkeeper-dist/src/assemble/bin-server.xml
index 10eb30c..c095f0a 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -53,6 +53,7 @@
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.22.Final/*</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index d636cab..0587ce7 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -262,7 +262,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [30]
 - lib/org.apache.commons-commons-collections4-4.1.jar [31]
 - lib/org.apache.commons-commons-lang3-3.3.2.jar [32]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [33]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [33]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [34]
@@ -298,6 +298,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
 - lib/org.inferred-freebuilder-1.14.9.jar [48]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [49]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [50]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -330,7 +331,7 @@ Apache Software License, Version 2.
 [30] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [31] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [32] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[33] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[33] Source available at https://github.com/apache/zookeeper/tree/release-3.4.13
 [34] Source available at https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [35] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [36] Source available at https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -347,6 +348,7 @@ Apache Software License, Version 2.
 [47] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
 [48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [49] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[50] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 
 ------------------------------------------------------------------------------------
@@ -547,4 +549,10 @@ license. For details, see deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index a012f64..278eb82 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -227,7 +227,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [17]
 - lib/org.apache.commons-commons-collections4-4.1.jar [18]
 - lib/org.apache.commons-commons-lang3-3.3.2.jar [19]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [20]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [20]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [21]
@@ -263,6 +263,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
 - lib/org.inferred-freebuilder-1.14.9.jar [35]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [36]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [37]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -283,7 +284,7 @@ Apache Software License, Version 2.
 [17] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [18] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [19] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[20] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[20] Source available at https://github.com/apache/zookeeper/tree/release-3.4.13
 [21] Source available at https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [22] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [23] Source available at https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -300,6 +301,7 @@ Apache Software License, Version 2.
 [34] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
 [35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [36] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[37] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.22.Final.jar bundles some 3rd party dependencies
@@ -435,3 +437,9 @@ license. For details, see deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
diff --git a/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
new file mode 100644
index 0000000..246f54f
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
@@ -0,0 +1,32 @@
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index ff48a08..ce19751 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -66,7 +66,7 @@ public class ZKRegistrationClient implements RegistrationClient {
 
         private final String regPath;
         private final Set<RegistrationListener> listeners;
-        private boolean closed = false;
+        private volatile boolean closed = false;
         private Set<BookieSocketAddress> bookies = null;
         private Version version = Version.NEW;
         private final CompletableFuture<Void> firstRunFuture;
@@ -154,23 +154,12 @@ public class ZKRegistrationClient implements RegistrationClient {
             scheduleWatchTask(0L);
         }
 
-        synchronized boolean isClosed() {
+        boolean isClosed() {
             return closed;
         }
 
         @Override
-        public synchronized void close() {
-            if (closed) {
-                return;
-            }
-            zk.removeWatches(
-                regPath,
-                this,
-                WatcherType.Children,
-                true,
-                (rc, path, ctx) -> {},
-                null
-            );
+        public void close() {
             closed = true;
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
index 24693ea..be037f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
@@ -44,7 +44,6 @@ import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -68,7 +67,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Provide a zookeeper client to handle session expire.
  */
-public class ZooKeeperClient extends ZooKeeper implements Watcher {
+public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable {
 
     private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class);
 
@@ -761,74 +760,6 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
     }
 
     @Override
-    public String create(final String path,
-                         final byte[] data,
-                         final List<ACL> acl,
-                         final CreateMode createMode,
-                         final Stat stat)
-            throws KeeperException, InterruptedException {
-        return ZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
-
-            @Override
-            public String call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return ZooKeeperClient.super.create(path, data, acl, createMode);
-                }
-                return zkHandle.create(path, data, acl, createMode);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-
-        }, operationRetryPolicy, rateLimiter, createStats);
-    }
-
-    @Override
-    public void create(final String path,
-                       final byte[] data,
-                       final List<ACL> acl,
-                       final CreateMode createMode,
-                       final Create2Callback cb,
-                       final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
-
-            final Create2Callback createCb = new Create2Callback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                    ZooWorker worker = (ZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, name, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
-                } else {
-                    zkHandle.create(path, data, acl, createMode, createCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
     public void delete(final String path, final int version) throws KeeperException, InterruptedException {
         ZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
 
@@ -1427,52 +1358,4 @@ public class ZooKeeperClient extends ZooKeeper implements Watcher {
         proc.run();
     }
 
-    @Override
-    public void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, local);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeWatches(String path,
-                              Watcher watcher,
-                              WatcherType watcherType,
-                              boolean local,
-                              VoidCallback cb,
-                              Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, local, cb, ctx);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local, cb, ctx);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local, cb, ctx);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local, cb, ctx);
-        }
-    }
-
-
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index e8e82b7..63de59c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -26,6 +26,8 @@ import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
+
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -69,7 +71,7 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
     @Test
     public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
         final int timeout = 2000;
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(timeout)
                 .build()) {
@@ -81,7 +83,9 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
     public void testBookieWatcherDieWhenSessionExpired() throws Exception {
         final int timeout = 2000;
         final CountDownLatch connectLatch = new CountDownLatch(1);
-        try (ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
+
+        @Cleanup
+        ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
             @Override
             public void process(WatchedEvent watchedEvent) {
                 if (EventType.None == watchedEvent.getType()
@@ -89,10 +93,10 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
                     connectLatch.countDown();
                 }
             }
-        })) {
-            connectLatch.await();
-            runBookieWatcherWhenSessionExpired(zk, timeout, false);
-        }
+        });
+
+        connectLatch.await();
+        runBookieWatcherWhenSessionExpired(zk, timeout, false);
     }
 
     private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boolean reconnectable)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
index 99336b4..1f5e0b6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
@@ -33,8 +33,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -42,6 +40,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
+
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
@@ -49,7 +48,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.client.BKException.ZKException;
 import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
@@ -60,12 +61,10 @@ import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -94,6 +93,7 @@ public class TestZkRegistrationClient extends MockZooKeeperTestCase {
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController controller;
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -372,17 +372,6 @@ public class TestZkRegistrationClient extends MockZooKeeperTestCase {
             zkRegistrationClient.unwatchReadOnlyBookies(secondListener);
             assertEquals(1, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
         }
-        // the watch task will not be closed since there is still a listener
-        verify(mockZk, times(0))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(isWritable ? zkRegistrationClient.getWatchWritableBookiesTask()
-                    : zkRegistrationClient.getWatchReadOnlyBookiesTask()),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
 
         // trigger watcher
         notifyWatchedEvent(
@@ -421,15 +410,6 @@ public class TestZkRegistrationClient extends MockZooKeeperTestCase {
         }
         // the watch task will not be closed since there is still a listener
         assertTrue(expectedWatcher.isClosed());
-        verify(mockZk, times(1))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(expectedWatcher),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index 609e154..cd9e065 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -523,7 +523,7 @@ public class TestReplicationWorker extends BookKeeperClusterTestCase {
      */
     @Test
     public void testRWZKConnectionLost() throws Exception {
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(10000)
                 .build()) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index 8e88418..bb9554a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.test.ZooKeeperUtil;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -226,7 +225,7 @@ public class TestZooKeeperClient extends TestCase {
 
         expireZooKeeperSession(client, timeout);
         logger.info("Create children under znode " + path);
-        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new Stat());
+        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         expireZooKeeperSession(client, timeout);
         List<String> children = client.getChildren(path, false, newStat);
@@ -791,16 +790,10 @@ public class TestZooKeeperClient extends TestCase {
         expireZooKeeperSession(client, timeout);
         logger.info("Create znode " + path);
         final CountDownLatch create2Latch = new CountDownLatch(1);
-        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
-                new Create2Callback() {
-
-            @Override
-            public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    create2Latch.countDown();
-                }
+        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
+            if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                create2Latch.countDown();
             }
-
         }, null);
         create2Latch.await();
         logger.info("Created znode " + path);
diff --git a/pom.xml b/pom.xml
index 48654c4..314efdc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,7 +155,7 @@
     <testcontainers.version>1.7.0</testcontainers.version>
     <twitter-server.version>1.29.0</twitter-server.version>
     <vertx.version>3.4.1</vertx.version>
-    <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
     <!-- plugin dependencies -->
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index 367a97d..eb03923 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -18,12 +18,14 @@
 package org.apache.distributedlog.bk;
 
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
+
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -41,7 +43,6 @@ import org.apache.distributedlog.zk.ZKTransaction;
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,21 +130,19 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
             final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
             zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
                     zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new org.apache.zookeeper.AsyncCallback.Create2Callback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
-                                        new LongVersion(stat.getVersion())));
-                            } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                FutureUtils.proxyTo(
-                                  Utils.zkGetData(zkc, allocatePath, false),
-                                  promise
-                                );
-                            } else {
-                                promise.completeExceptionally(Utils.zkException(
-                                        KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
-                            }
+                    (rc, path, ctx, name) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            // Since the z-node was just created, we are sure at this point the version is 0
+                            promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
+                                    new LongVersion(0)));
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            FutureUtils.proxyTo(
+                              Utils.zkGetData(zkc, allocatePath, false),
+                              promise
+                            );
+                        } else {
+                            promise.completeExceptionally(Utils.zkException(
+                                    KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
                         }
                     }, null);
             return promise;
@@ -489,7 +488,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
                     @Override
                     public void onFailure(Throwable cause) {
                         LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                        closePromise.complete(null);
+                        FutureUtils.complete(closePromise, null);
                     }
                 });
             }
@@ -497,7 +496,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
             @Override
             public void onFailure(Throwable cause) {
                 LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                closePromise.complete(null);
+                FutureUtils.complete(closePromise, null);
             }
         });
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index 8c350c9..cbdae1d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -25,8 +25,6 @@ import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -165,28 +163,6 @@ public class ZKWatcherManager implements Watcher {
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
             }
             if (watchers.isEmpty()) {
-                // best-efforts to remove watches
-                try {
-                    if (null != zkc && removeFromServer) {
-                        zkc.get().removeWatches(path, this, WatcherType.Children, true,
-                                new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx) {
-                                if (KeeperException.Code.OK.intValue() == rc) {
-                                    logger.debug("Successfully removed children watches from {}", path);
-                                } else {
-                                    logger.debug("Encountered exception on removing children watches from {}",
-                                            path, KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        }, null);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                }
                 childWatches.remove(path, watchers);
             }
         }
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
index 8f5fb8a..9481ec8 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
@@ -22,7 +22,7 @@ import static java.time.temporal.ChronoUnit.SECONDS;
 
 import java.time.Duration;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.apache.bookkeeper.tests.containers.wait.ZKWaitStrategy;
 
 @Slf4j
 public class ZKContainer<SELF extends ZKContainer<SELF>> extends MetadataStoreContainer<SELF> {
@@ -59,11 +59,8 @@ public class ZKContainer<SELF extends ZKContainer<SELF>> extends MetadataStoreCo
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/commands/ruok")
-            .forStatusCode(200)
-            .forPort(ZK_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        this.waitStrategy = new ZKWaitStrategy(ZK_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
 
         this.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withHostName(HOST_NAME));
 
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
index 7020fff..4361f79 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/BookKeeperClusterUtils.java
@@ -29,6 +29,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.net.Socket;
 
+import lombok.Cleanup;
+
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -77,21 +79,21 @@ public class BookKeeperClusterUtils {
     }
 
     public static void legacyMetadataFormat(DockerClient docker) throws Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     public static boolean metadataFormatIfNeeded(DockerClient docker, String version) throws Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists("/ledgers", false) == null) {
-                String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
-                runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", "-nonInteractive");
-                return true;
-            } else {
-                return false;
-            }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists("/ledgers", false) == null) {
+            String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
+            runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", "-nonInteractive");
+            return true;
+        } else {
+            return false;
         }
     }
 
@@ -100,17 +102,18 @@ public class BookKeeperClusterUtils {
                                                      String namespace) throws Exception {
         String zkServers = BookKeeperClusterUtils.zookeeperConnectString(docker);
         String dlogUri = "distributedlog://" + zkServers + namespace;
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists(namespace, false) == null) {
-                String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
-
-                runOnAnyBookie(docker, dlog,
-                    "admin",
-                    "bind",
-                    "-l", "/ledgers",
-                    "-s", zkServers,
-                    "-c", dlogUri);
-            }
+
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists(namespace, false) == null) {
+            String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
+
+            runOnAnyBookie(docker, dlog,
+                "admin",
+                "bind",
+                "-l", "/ledgers",
+                "-s", zkServers,
+                "-c", dlogUri);
         }
         return dlogUri;
     }
@@ -167,7 +170,9 @@ public class BookKeeperClusterUtils {
         long timeoutMillis = timeoutUnit.toMillis(timeout);
         long pollMillis = 1000;
         String bookieId = DockerUtils.getContainerIP(docker, containerId) + ":3181";
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
+        try {
+            @Cleanup
+            ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
             String path = "/ledgers/available/" + bookieId;
             while (timeoutMillis > 0) {
                 if ((zk.exists(path, false) != null) == upOrDown) {