You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/17 20:04:37 UTC

[GitHub] merlimat closed pull request #1601: Downgraded ZK version to 3.4.13

merlimat closed pull request #1601: Downgraded ZK version to 3.4.13
URL: https://github.com/apache/bookkeeper/pull/1601
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
index 750984f4ab..18c73e88e1 100644
--- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
+++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchReadThroughputLatency.java
@@ -35,6 +35,7 @@
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -42,7 +43,6 @@
 import org.apache.commons.cli.PosixParser;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +55,7 @@
     private static final Pattern LEDGER_PATTERN = Pattern.compile("L([0-9]+)$");
 
     private static final Comparator<String> ZK_LEDGER_COMPARE = new Comparator<String>() {
+        @Override
         public int compare(String o1, String o2) {
             try {
                 Matcher m1 = LEDGER_PATTERN.matcher(o1);
@@ -186,28 +187,21 @@ public static void main(String[] args) throws Exception {
         }
 
         final CountDownLatch shutdownLatch = new CountDownLatch(1);
-        final CountDownLatch connectedLatch = new CountDownLatch(1);
         final String nodepath = String.format("/ledgers/L%010d", ledger.get());
 
         final ClientConfiguration conf = new ClientConfiguration();
         conf.setReadTimeout(sockTimeout).setZkServers(servers);
 
-        try (ZooKeeper zk = new ZooKeeper(servers, 3000, new Watcher() {
-                public void process(WatchedEvent event) {
-                    if (event.getState() == Event.KeeperState.SyncConnected
-                            && event.getType() == Event.EventType.None) {
-                        connectedLatch.countDown();
-                    }
-                }
-        })) {
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
+                .connectString(servers)
+                .sessionTimeoutMs(3000)
+                .build()) {
             final Set<String> processedLedgers = new HashSet<String>();
             zk.register(new Watcher() {
+                    @Override
                     public void process(WatchedEvent event) {
                         try {
-                            if (event.getState() == Event.KeeperState.SyncConnected
-                                && event.getType() == Event.EventType.None) {
-                                connectedLatch.countDown();
-                            } else if (event.getType() == Event.EventType.NodeCreated
+                            if (event.getType() == Event.EventType.NodeCreated
                                        && event.getPath().equals(nodepath)) {
                                 readLedger(conf, ledger.get(), passwd);
                                 shutdownLatch.countDown();
@@ -233,6 +227,7 @@ public void process(WatchedEvent event) {
                                             final Long ledgerId = Long.valueOf(m.group(1));
                                             processedLedgers.add(ledger);
                                             Thread t = new Thread() {
+                                                @Override
                                                 public void run() {
                                                     readLedger(conf, ledgerId, passwd);
                                                 }
@@ -254,7 +249,7 @@ public void run() {
                         }
                     }
                 });
-            connectedLatch.await();
+
             if (ledger.get() != 0) {
                 if (zk.exists(nodepath, true) != null) {
                     readLedger(conf, ledger.get(), passwd);
diff --git a/bookkeeper-dist/src/assemble/bin-all.xml b/bookkeeper-dist/src/assemble/bin-all.xml
index a51d0e0651..7b047c4468 100644
--- a/bookkeeper-dist/src/assemble/bin-all.xml
+++ b/bookkeeper-dist/src/assemble/bin-all.xml
@@ -61,6 +61,7 @@
         <include>netty-4.1.22.Final/*</include>
         <include>paranamer-2.8/LICENSE.txt</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>scala-library-2.11.7/LICENSE.md</include>
         <include>scala-parser-combinators_2.11-1.0.4/LICENSE.md</include>
diff --git a/bookkeeper-dist/src/assemble/bin-server.xml b/bookkeeper-dist/src/assemble/bin-server.xml
index 761af96a10..aa7d1b89c8 100644
--- a/bookkeeper-dist/src/assemble/bin-server.xml
+++ b/bookkeeper-dist/src/assemble/bin-server.xml
@@ -53,6 +53,7 @@
         <include>javax.servlet-api-3.1.0/CDDL+GPL-1.1</include>
         <include>netty-4.1.22.Final/*</include>
         <include>protobuf-3.0.0/LICENSE</include>
+        <include>jline-0.9.94/LICENSE</include>
         <include>protobuf-3.5.1/LICENSE</include>
         <include>slf4j-1.7.25/LICENSE.txt</include>
       </includes>
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index f1bcc84790..5303d52862 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -262,7 +262,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [30]
 - lib/org.apache.commons-commons-collections4-4.1.jar [31]
 - lib/org.apache.commons-commons-lang3-3.6.jar [32]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [33]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [33]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [34]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [34]
@@ -298,6 +298,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [47]
 - lib/org.inferred-freebuilder-1.14.9.jar [48]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [49]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [50]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -330,7 +331,7 @@ Apache Software License, Version 2.
 [30] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [31] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [32] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[33] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[33] Source available at https://github.com/apache/zookeeper/tree/release-3.4.13
 [34] Source available at https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [35] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [36] Source available at https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -347,6 +348,7 @@ Apache Software License, Version 2.
 [47] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
 [48] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [49] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[50] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 
 ------------------------------------------------------------------------------------
@@ -547,4 +549,10 @@ license. For details, see deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
 
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index d59d322b16..eac571468e 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -227,7 +227,7 @@ Apache Software License, Version 2.
 - lib/net.java.dev.jna-jna-3.2.7.jar [17]
 - lib/org.apache.commons-commons-collections4-4.1.jar [18]
 - lib/org.apache.commons-commons-lang3-3.6.jar [19]
-- lib/org.apache.zookeeper-zookeeper-3.5.3-beta.jar [20]
+- lib/org.apache.zookeeper-zookeeper-3.4.13.jar [20]
 - lib/org.eclipse.jetty-jetty-http-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-io-9.4.5.v20170502.jar [21]
 - lib/org.eclipse.jetty-jetty-security-9.4.5.v20170502.jar [21]
@@ -263,6 +263,7 @@ Apache Software License, Version 2.
 - lib/org.apache.curator-curator-recipes-4.0.1.jar [34]
 - lib/org.inferred-freebuilder-1.14.9.jar [35]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [36]
+- lib/org.apache.yetus-audience-annotations-0.5.0.jar [37]
 
 [1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -283,7 +284,7 @@ Apache Software License, Version 2.
 [17] Source available at https://github.com/java-native-access/jna/tree/3.2.7
 [18] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-collections.git;a=tag;h=a3a5ad
 [19] Source available at https://git-wip-us.apache.org/repos/asf?p=commons-lang.git;a=tag;h=3ad2e8
-[20] Source available at https://github.com/apache/zookeeper/tree/release-3.5.3
+[20] Source available at https://github.com/apache/zookeeper/tree/release-3.4.13
 [21] Source available at https://github.com/eclipse/jetty.project/tree/jetty-9.4.5.v20170502
 [22] Source available at https://github.com/facebook/rocksdb/tree/v5.13.1
 [23] Source available at https://github.com/cbeust/jcommander/tree/jcommander-1.48
@@ -300,6 +301,7 @@ Apache Software License, Version 2.
 [34] Source available at https://github.com/apache/curator/tree/apache-curator-4.0.1
 [35] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [36] Source available at https://github.com/google/error-prone/tree/v2.1.2
+[37] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
 
 ------------------------------------------------------------------------------------
 lib/io.netty-netty-all-4.1.22.Final.jar bundles some 3rd party dependencies
@@ -435,3 +437,9 @@ license. For details, see deps/google-auth-library-credentials-0.9.0/LICENSE
 Bundled as
   - lib/com.google.auth-google-auth-library-credentials-0.9.0.jar
 Source available at https://github.com/google/google-auth-library-java/tree/0.9.0
+------------------------------------------------------------------------------------
+This product bundles the JLine Library, which is available under a "2-clause BSD"
+license. For details, see deps/jline-0.9.94/LICENSE
+
+Bundled as
+  - lib/jline-jline-0.9.94.jar
diff --git a/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
new file mode 100644
index 0000000000..246f54f736
--- /dev/null
+++ b/bookkeeper-dist/src/main/resources/deps/jline-0.9.94/LICENSE
@@ -0,0 +1,32 @@
+Copyright (c) 2002-2006, Marc Prud'hommeaux <mw...@cornell.edu>
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or
+without modification, are permitted provided that the following
+conditions are met:
+
+Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with
+the distribution.
+
+Neither the name of JLine nor the names of its contributors
+may be used to endorse or promote products derived from this
+software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
+AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
+OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
+IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
+OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
index ff48a08b3a..ce19751c9d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java
@@ -66,7 +66,7 @@
 
         private final String regPath;
         private final Set<RegistrationListener> listeners;
-        private boolean closed = false;
+        private volatile boolean closed = false;
         private Set<BookieSocketAddress> bookies = null;
         private Version version = Version.NEW;
         private final CompletableFuture<Void> firstRunFuture;
@@ -154,23 +154,12 @@ public void process(WatchedEvent event) {
             scheduleWatchTask(0L);
         }
 
-        synchronized boolean isClosed() {
+        boolean isClosed() {
             return closed;
         }
 
         @Override
-        public synchronized void close() {
-            if (closed) {
-                return;
-            }
-            zk.removeWatches(
-                regPath,
-                this,
-                WatcherType.Children,
-                true,
-                (rc, path, ctx) -> {},
-                null
-            );
+        public void close() {
             closed = true;
         }
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
index 24693eae2f..be037f506c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java
@@ -44,7 +44,6 @@
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
 import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.MultiCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
@@ -68,7 +67,7 @@
 /**
  * Provide a zookeeper client to handle session expire.
  */
-public class ZooKeeperClient extends ZooKeeper implements Watcher {
+public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable {
 
     private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class);
 
@@ -760,74 +759,6 @@ public String toString() {
         proc.run();
     }
 
-    @Override
-    public String create(final String path,
-                         final byte[] data,
-                         final List<ACL> acl,
-                         final CreateMode createMode,
-                         final Stat stat)
-            throws KeeperException, InterruptedException {
-        return ZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
-
-            @Override
-            public String call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return ZooKeeperClient.super.create(path, data, acl, createMode);
-                }
-                return zkHandle.create(path, data, acl, createMode);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-
-        }, operationRetryPolicy, rateLimiter, createStats);
-    }
-
-    @Override
-    public void create(final String path,
-                       final byte[] data,
-                       final List<ACL> acl,
-                       final CreateMode createMode,
-                       final Create2Callback cb,
-                       final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
-
-            final Create2Callback createCb = new Create2Callback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                    ZooWorker worker = (ZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, name, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
-                } else {
-                    zkHandle.create(path, data, acl, createMode, createCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
     @Override
     public void delete(final String path, final int version) throws KeeperException, InterruptedException {
         ZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
@@ -1427,52 +1358,4 @@ public String toString() {
         proc.run();
     }
 
-    @Override
-    public void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, local);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeWatches(String path,
-                              Watcher watcher,
-                              WatcherType watcherType,
-                              boolean local,
-                              VoidCallback cb,
-                              Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeWatches(path, watcher, watcherType, local, cb, ctx);
-        } else {
-            zkHandle.removeWatches(path, watcher, watcherType, local, cb, ctx);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean local)
-            throws InterruptedException, KeeperException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local);
-        }
-    }
-
-    @Override
-    public void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            ZooKeeperClient.super.removeAllWatches(path, watcherType, local, cb, ctx);
-        } else {
-            zkHandle.removeAllWatches(path, watcherType, local, cb, ctx);
-        }
-    }
-
-
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index e8e82b71ad..63de59cb88 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -26,6 +26,8 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
+
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
@@ -69,7 +71,7 @@ public void process(WatchedEvent event) {
     @Test
     public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
         final int timeout = 2000;
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(timeout)
                 .build()) {
@@ -81,7 +83,9 @@ public void testBookieWatcherSurviveWhenSessionExpired() throws Exception {
     public void testBookieWatcherDieWhenSessionExpired() throws Exception {
         final int timeout = 2000;
         final CountDownLatch connectLatch = new CountDownLatch(1);
-        try (ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
+
+        @Cleanup
+        ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), timeout, new Watcher() {
             @Override
             public void process(WatchedEvent watchedEvent) {
                 if (EventType.None == watchedEvent.getType()
@@ -89,10 +93,10 @@ public void process(WatchedEvent watchedEvent) {
                     connectLatch.countDown();
                 }
             }
-        })) {
-            connectLatch.await();
-            runBookieWatcherWhenSessionExpired(zk, timeout, false);
-        }
+        });
+
+        connectLatch.await();
+        runBookieWatcherWhenSessionExpired(zk, timeout, false);
     }
 
     private void runBookieWatcherWhenSessionExpired(ZooKeeper zk, int timeout, boolean reconnectable)
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
index 99336b478d..1f5e0b6896 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/discover/TestZkRegistrationClient.java
@@ -33,8 +33,6 @@
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -42,6 +40,7 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.Lists;
+
 import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
@@ -49,7 +48,9 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.bookkeeper.client.BKException.ZKException;
 import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
@@ -60,12 +61,10 @@
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.Watcher.WatcherType;
 import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
@@ -94,6 +93,7 @@
     private ScheduledExecutorService mockExecutor;
     private MockExecutorController controller;
 
+    @Override
     @Before
     public void setup() throws Exception {
         super.setup();
@@ -372,17 +372,6 @@ private void testWatchBookiesSuccess(boolean isWritable)
             zkRegistrationClient.unwatchReadOnlyBookies(secondListener);
             assertEquals(1, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
         }
-        // the watch task will not be closed since there is still a listener
-        verify(mockZk, times(0))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(isWritable ? zkRegistrationClient.getWatchWritableBookiesTask()
-                    : zkRegistrationClient.getWatchReadOnlyBookiesTask()),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
 
         // trigger watcher
         notifyWatchedEvent(
@@ -421,15 +410,6 @@ private void testWatchBookiesSuccess(boolean isWritable)
         }
         // the watch task will not be closed since there is still a listener
         assertTrue(expectedWatcher.isClosed());
-        verify(mockZk, times(1))
-            .removeWatches(
-                eq(isWritable ? regPath : regReadonlyPath),
-                same(expectedWatcher),
-                eq(WatcherType.Children),
-                eq(true),
-                any(VoidCallback.class),
-                any()
-            );
     }
 
     @Test
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
index f64456bb60..e4b1232e77 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java
@@ -655,7 +655,7 @@ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR()
      */
     @Test
     public void testRWZKConnectionLost() throws Exception {
-        try (ZooKeeper zk = ZooKeeperClient.newBuilder()
+        try (ZooKeeperClient zk = ZooKeeperClient.newBuilder()
                 .connectString(zkUtil.getZooKeeperConnectString())
                 .sessionTimeoutMs(10000)
                 .build()) {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
index 8e884183c7..bb9554ac2e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java
@@ -36,7 +36,6 @@
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.AsyncCallback.ACLCallback;
 import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.Create2Callback;
 import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.StringCallback;
@@ -226,7 +225,7 @@ public void testRetrySyncOperations() throws Exception {
 
         expireZooKeeperSession(client, timeout);
         logger.info("Create children under znode " + path);
-        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new Stat());
+        client.create(path + "/children2", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         expireZooKeeperSession(client, timeout);
         List<String> children = client.getChildren(path, false, newStat);
@@ -791,16 +790,10 @@ public void processResult(int rc, String path, Object ctx, String name) {
         expireZooKeeperSession(client, timeout);
         logger.info("Create znode " + path);
         final CountDownLatch create2Latch = new CountDownLatch(1);
-        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
-                new Create2Callback() {
-
-            @Override
-            public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                    create2Latch.countDown();
-                }
+        client.create(path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
+            if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                create2Latch.countDown();
             }
-
         }, null);
         create2Latch.await();
         logger.info("Created znode " + path);
diff --git a/pom.xml b/pom.xml
index 471cec5b71..ee2b745602 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@
     <testcontainers.version>1.7.0</testcontainers.version>
     <twitter-server.version>1.29.0</twitter-server.version>
     <vertx.version>3.4.1</vertx.version>
-    <zookeeper.version>3.5.3-beta</zookeeper.version>
+    <zookeeper.version>3.4.13</zookeeper.version>
     <!-- plugin dependencies -->
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index 367a97d309..eb03923a10 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -18,12 +18,14 @@
 package org.apache.distributedlog.bk;
 
 import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
+
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -41,7 +43,6 @@
 import org.apache.distributedlog.zk.ZKVersionedSetOp;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,21 +130,19 @@ public ConcurrentObtainException(Phase phase, String msg) {
             final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
             zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
                     zkc.getDefaultACL(), CreateMode.PERSISTENT,
-                    new org.apache.zookeeper.AsyncCallback.Create2Callback() {
-                        @Override
-                        public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
-                            if (KeeperException.Code.OK.intValue() == rc) {
-                                promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
-                                        new LongVersion(stat.getVersion())));
-                            } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                                FutureUtils.proxyTo(
-                                  Utils.zkGetData(zkc, allocatePath, false),
-                                  promise
-                                );
-                            } else {
-                                promise.completeExceptionally(Utils.zkException(
-                                        KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
-                            }
+                    (rc, path, ctx, name) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            // Since the z-node was just created, we are sure at this point the version is 0
+                            promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
+                                    new LongVersion(0)));
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            FutureUtils.proxyTo(
+                              Utils.zkGetData(zkc, allocatePath, false),
+                              promise
+                            );
+                        } else {
+                            promise.completeExceptionally(Utils.zkException(
+                                    KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
                         }
                     }, null);
             return promise;
@@ -489,7 +488,7 @@ public void onSuccess(List<Void> values) {
                     @Override
                     public void onFailure(Throwable cause) {
                         LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                        closePromise.complete(null);
+                        FutureUtils.complete(closePromise, null);
                     }
                 });
             }
@@ -497,7 +496,7 @@ public void onFailure(Throwable cause) {
             @Override
             public void onFailure(Throwable cause) {
                 LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
-                closePromise.complete(null);
+                FutureUtils.complete(closePromise, null);
             }
         });
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
index 8c350c9f4b..cbdae1d735 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/zk/ZKWatcherManager.java
@@ -25,8 +25,6 @@
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -165,28 +163,6 @@ public void unregisterChildWatcher(String path, Watcher watcher, boolean removeF
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
             }
             if (watchers.isEmpty()) {
-                // best-efforts to remove watches
-                try {
-                    if (null != zkc && removeFromServer) {
-                        zkc.get().removeWatches(path, this, WatcherType.Children, true,
-                                new AsyncCallback.VoidCallback() {
-                            @Override
-                            public void processResult(int rc, String path, Object ctx) {
-                                if (KeeperException.Code.OK.intValue() == rc) {
-                                    logger.debug("Successfully removed children watches from {}", path);
-                                } else {
-                                    logger.debug("Encountered exception on removing children watches from {}",
-                                            path, KeeperException.create(KeeperException.Code.get(rc)));
-                                }
-                            }
-                        }, null);
-                    }
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
-                    logger.debug("Encountered exception on removing watches from {}", path, e);
-                }
                 childWatches.remove(path, watchers);
             }
         }
diff --git a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
index 4ef6a78bc0..fd9e4a7d8a 100644
--- a/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
+++ b/tests/integration-tests-topologies/src/main/java/org/apache/bookkeeper/tests/containers/ZKContainer.java
@@ -22,7 +22,7 @@
 
 import java.time.Duration;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.bookkeeper.tests.containers.wait.HttpWaitStrategy;
+import org.apache.bookkeeper.tests.containers.wait.ZKWaitStrategy;
 
 /**
  * Test container that runs zookeeper.
@@ -62,11 +62,8 @@ protected void configure() {
 
     @Override
     public void start() {
-        this.waitStrategy = new HttpWaitStrategy()
-            .forPath("/commands/ruok")
-            .forStatusCode(200)
-            .forPort(ZK_HTTP_PORT)
-            .withStartupTimeout(Duration.of(60, SECONDS));
+        this.waitStrategy = new ZKWaitStrategy(ZK_PORT)
+                .withStartupTimeout(Duration.of(60, SECONDS));
 
         this.withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withHostName(HOST_NAME));
 
diff --git a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
index b0f4e00806..8d025964b0 100644
--- a/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
+++ b/tests/integration-tests-utils/src/main/java/org/apache/bookkeeper/tests/integration/utils/BookKeeperClusterUtils.java
@@ -28,6 +28,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import lombok.Cleanup;
+
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -79,21 +81,21 @@ public static boolean zookeeperRunning(String ip, int port) {
     }
 
     public static void legacyMetadataFormat(DockerClient docker) throws Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        zk.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
     }
 
     public static boolean metadataFormatIfNeeded(DockerClient docker, String version) throws Exception {
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists("/ledgers", false) == null) {
-                String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
-                runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", "-nonInteractive");
-                return true;
-            } else {
-                return false;
-            }
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists("/ledgers", false) == null) {
+            String bookkeeper = "/opt/bookkeeper/" + version + "/bin/bookkeeper";
+            runOnAnyBookie(docker, bookkeeper, "shell", "metaformat", "-nonInteractive");
+            return true;
+        } else {
+            return false;
         }
     }
 
@@ -102,17 +104,18 @@ public static String createDlogNamespaceIfNeeded(DockerClient docker,
                                                      String namespace) throws Exception {
         String zkServers = BookKeeperClusterUtils.zookeeperConnectString(docker);
         String dlogUri = "distributedlog://" + zkServers + namespace;
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
-            if (zk.exists(namespace, false) == null) {
-                String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
-
-                runOnAnyBookie(docker, dlog,
-                    "admin",
-                    "bind",
-                    "-l", "/ledgers",
-                    "-s", zkServers,
-                    "-c", dlogUri);
-            }
+
+        @Cleanup
+        ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
+        if (zk.exists(namespace, false) == null) {
+            String dlog = "/opt/bookkeeper/" + version + "/bin/dlog";
+
+            runOnAnyBookie(docker, dlog,
+                "admin",
+                "bind",
+                "-l", "/ledgers",
+                "-s", zkServers,
+                "-c", dlogUri);
         }
         return dlogUri;
     }
@@ -169,7 +172,9 @@ private static boolean waitBookieState(DockerClient docker, String containerId,
         long timeoutMillis = timeoutUnit.toMillis(timeout);
         long pollMillis = 1000;
         String bookieId = DockerUtils.getContainerIP(docker, containerId) + ":3181";
-        try (ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker)) {
+        try {
+            @Cleanup
+            ZooKeeper zk = BookKeeperClusterUtils.zookeeperClient(docker);
             String path = "/ledgers/available/" + bookieId;
             while (timeoutMillis > 0) {
                 if ((zk.exists(path, false) != null) == upOrDown) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services