You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/05/26 12:18:09 UTC

[pulsar] branch master updated: Clean up deprecated Zookeeper client (#4306)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e17506d  Clean up deprecated Zookeeper client (#4306)
e17506d is described below

commit e17506d3ea2088b221d33dfd0ac9666d2ae4313a
Author: Like <ke...@outlook.com>
AuthorDate: Sun May 26 20:18:04 2019 +0800

    Clean up deprecated Zookeeper client (#4306)
    
    The Zookeeper client related classes are already deprecated and not used any more. We can clean up now.
---
 .../bookkeeper/zookeeper/BkZooKeeperClient.java    | 1365 --------------------
 .../apache/bookkeeper/zookeeper/BkZooWorker.java   |  162 ---
 .../util/BkReadOnlyZookeeperClientFactoryImpl.java |   74 --
 3 files changed, 1601 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
deleted file mode 100644
index be60294..0000000
--- a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooKeeperClient.java
+++ /dev/null
@@ -1,1365 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.zookeeper;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.BkZooWorker.ZooCallable;
-import org.apache.zookeeper.AsyncCallback.ACLCallback;
-import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.AsyncCallback.MultiCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Op;
-import org.apache.zookeeper.OpResult;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide a zookeeper client to handle session expire.
- *
- * Note this is copied from BK ZooKeeperClient class. The only addition here is that it allows to specify read-only mode
- * which we use when connecting to global ZK.
- *
- * TODO: Remove this class once BK-4.8 is released to include read-only in ZooKeeperClient.
- */
-public class BkZooKeeperClient extends ZooKeeper implements Watcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(BkZooKeeperClient.class);
-
-    private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1;
-
-    // ZooKeeper client connection variables
-    private final String connectString;
-    private final int sessionTimeoutMs;
-    private final boolean allowReadOnlyMode;
-
-    // state for the zookeeper client
-    private final AtomicReference<ZooKeeper> zk = new AtomicReference<ZooKeeper>();
-    private final AtomicBoolean closed = new AtomicBoolean(false);
-    private final ZooKeeperWatcherBase watcherManager;
-
-    private final ScheduledExecutorService retryExecutor;
-    private final ExecutorService connectExecutor;
-
-    // rate limiter
-    private final RateLimiter rateLimiter;
-
-    // retry polices
-    private final RetryPolicy connectRetryPolicy;
-    private final RetryPolicy operationRetryPolicy;
-
-    // Stats Logger
-    private final OpStatsLogger createStats;
-    private final OpStatsLogger getStats;
-    private final OpStatsLogger setStats;
-    private final OpStatsLogger deleteStats;
-    private final OpStatsLogger getChildrenStats;
-    private final OpStatsLogger existsStats;
-    private final OpStatsLogger multiStats;
-    private final OpStatsLogger getACLStats;
-    private final OpStatsLogger setACLStats;
-    private final OpStatsLogger syncStats;
-    private final OpStatsLogger createClientStats;
-
-    private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
-
-        @Override
-        public ZooKeeper call() throws Exception {
-            try {
-                return BkZooWorker.syncCallWithRetries(null, new ZooCallable<ZooKeeper>() {
-
-                    @Override
-                    public ZooKeeper call() throws KeeperException, InterruptedException {
-                        logger.info("Reconnecting zookeeper {}.", connectString);
-                        // close the previous one
-                        closeZkHandle();
-                        ZooKeeper newZk;
-                        try {
-                            newZk = createZooKeeper();
-                        } catch (IOException ie) {
-                            logger.error("Failed to create zookeeper instance to " + connectString, ie);
-                            throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
-                        }
-                        waitForConnection();
-                        zk.set(newZk);
-                        logger.info("ZooKeeper session {} is created to {}.",
-                                Long.toHexString(newZk.getSessionId()), connectString);
-                        return newZk;
-                    }
-
-                    @Override
-                    public String toString() {
-                        return String.format("ZooKeeper Client Creator (%s)", connectString);
-                    }
-
-                }, connectRetryPolicy, rateLimiter, createClientStats);
-            } catch (Exception e) {
-                logger.error("Gave up reconnecting to ZooKeeper : ", e);
-                Runtime.getRuntime().exit(-1);
-                return null;
-            }
-        }
-
-    };
-
-    @VisibleForTesting
-    static BkZooKeeperClient createConnectedZooKeeperClient(
-            String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
-            RetryPolicy operationRetryPolicy)
-                    throws KeeperException, InterruptedException, IOException {
-        return BkZooKeeperClient.newBuilder()
-                .connectString(connectString)
-                .sessionTimeoutMs(sessionTimeoutMs)
-                .watchers(childWatchers)
-                .operationRetryPolicy(operationRetryPolicy)
-                .build();
-    }
-
-    /**
-     * A builder to build retryable zookeeper client.
-     */
-    public static class Builder {
-        String connectString = null;
-        int sessionTimeoutMs = 10000;
-        Set<Watcher> watchers = null;
-        RetryPolicy connectRetryPolicy = null;
-        RetryPolicy operationRetryPolicy = null;
-        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
-        int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
-        double requestRateLimit = 0;
-        boolean allowReadOnlyMode = false;
-
-        private Builder() {}
-
-        public Builder connectString(String connectString) {
-            this.connectString = connectString;
-            return this;
-        }
-
-        public Builder sessionTimeoutMs(int sessionTimeoutMs) {
-            this.sessionTimeoutMs = sessionTimeoutMs;
-            return this;
-        }
-
-        public Builder watchers(Set<Watcher> watchers) {
-            this.watchers = watchers;
-            return this;
-        }
-
-        public Builder connectRetryPolicy(RetryPolicy retryPolicy) {
-            this.connectRetryPolicy = retryPolicy;
-            return this;
-        }
-
-        public Builder operationRetryPolicy(RetryPolicy retryPolicy) {
-            this.operationRetryPolicy = retryPolicy;
-            return this;
-        }
-
-        public Builder statsLogger(StatsLogger statsLogger) {
-            this.statsLogger = statsLogger;
-            return this;
-        }
-
-        public Builder requestRateLimit(double requestRateLimit) {
-            this.requestRateLimit = requestRateLimit;
-            return this;
-        }
-
-        public Builder retryThreadCount(int numThreads) {
-            this.retryExecThreadCount = numThreads;
-            return this;
-        }
-
-        public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
-            this.allowReadOnlyMode = allowReadOnlyMode;
-            return this;
-        }
-
-        public BkZooKeeperClient build() throws IOException, KeeperException, InterruptedException {
-            checkNotNull(connectString);
-            checkArgument(sessionTimeoutMs > 0);
-            checkNotNull(statsLogger);
-            checkArgument(retryExecThreadCount > 0);
-
-            if (null == connectRetryPolicy) {
-                connectRetryPolicy =
-                        new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
-            }
-            if (null == operationRetryPolicy) {
-                operationRetryPolicy =
-                        new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0);
-            }
-
-            // Create a watcher manager
-            StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
-            ZooKeeperWatcherBase watcherManager =
-                    null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
-                            new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
-            BkZooKeeperClient client = new BkZooKeeperClient(
-                    connectString,
-                    sessionTimeoutMs,
-                    watcherManager,
-                    connectRetryPolicy,
-                    operationRetryPolicy,
-                    statsLogger,
-                    retryExecThreadCount,
-                    requestRateLimit,
-                    allowReadOnlyMode
-            );
-            // Wait for connection to be established.
-            try {
-                watcherManager.waitForConnection();
-            } catch (KeeperException ke) {
-                client.close();
-                throw ke;
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-                client.close();
-                throw ie;
-            }
-            return client;
-        }
-    }
-
-    public static Builder newBuilder() {
-        return new Builder();
-    }
-
-    BkZooKeeperClient(String connectString,
-                    int sessionTimeoutMs,
-                    ZooKeeperWatcherBase watcherManager,
-                    RetryPolicy connectRetryPolicy,
-                    RetryPolicy operationRetryPolicy,
-                    StatsLogger statsLogger,
-                    int retryExecThreadCount,
-                    double rate,
-                    boolean allowReadOnlyMode) throws IOException {
-        super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
-        this.connectString = connectString;
-        this.sessionTimeoutMs = sessionTimeoutMs;
-        this.allowReadOnlyMode =  allowReadOnlyMode;
-        this.watcherManager = watcherManager;
-        this.connectRetryPolicy = connectRetryPolicy;
-        this.operationRetryPolicy = operationRetryPolicy;
-        this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null;
-        this.retryExecutor =
-                Executors.newScheduledThreadPool(retryExecThreadCount,
-                    new ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build());
-        this.connectExecutor =
-                Executors.newSingleThreadExecutor(
-                    new ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build());
-        // added itself to the watcher
-        watcherManager.addChildWatcher(this);
-
-        // Stats
-        StatsLogger scopedStatsLogger = statsLogger.scope("zk");
-        createClientStats = scopedStatsLogger.getOpStatsLogger("create_client");
-        createStats = scopedStatsLogger.getOpStatsLogger("create");
-        getStats = scopedStatsLogger.getOpStatsLogger("get_data");
-        setStats = scopedStatsLogger.getOpStatsLogger("set_data");
-        deleteStats = scopedStatsLogger.getOpStatsLogger("delete");
-        getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children");
-        existsStats = scopedStatsLogger.getOpStatsLogger("exists");
-        multiStats = scopedStatsLogger.getOpStatsLogger("multi");
-        getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl");
-        setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl");
-        syncStats = scopedStatsLogger.getOpStatsLogger("sync");
-    }
-
-    @Override
-    public void close() throws InterruptedException {
-        closed.set(true);
-        connectExecutor.shutdown();
-        retryExecutor.shutdown();
-        closeZkHandle();
-    }
-
-    private void closeZkHandle() throws InterruptedException {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            super.close();
-        } else {
-            zkHandle.close();
-        }
-    }
-
-    protected void waitForConnection() throws KeeperException, InterruptedException {
-        watcherManager.waitForConnection();
-    }
-
-    protected ZooKeeper createZooKeeper() throws IOException {
-        return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        if (event.getType() == EventType.None
-            && event.getState() == KeeperState.Expired) {
-            onExpired();
-        }
-    }
-
-    private void onExpired() {
-        if (closed.get()) {
-            // we don't schedule any tries if the client is closed.
-            return;
-        }
-
-        logger.info("ZooKeeper session {} is expired from {}.",
-                Long.toHexString(getSessionId()), connectString);
-        try {
-            connectExecutor.submit(clientCreator);
-        } catch (RejectedExecutionException ree) {
-            if (!closed.get()) {
-                logger.error("ZooKeeper reconnect task is rejected : ", ree);
-            }
-        } catch (Exception t) {
-            logger.error("Failed to submit zookeeper reconnect task due to runtime exception : ", t);
-        }
-    }
-
-    /**
-     * A runnable that retries zookeeper operations.
-     */
-    abstract static class ZkRetryRunnable implements Runnable {
-
-        final BkZooWorker worker;
-        final RateLimiter rateLimiter;
-        final Runnable that;
-
-        ZkRetryRunnable(RetryPolicy retryPolicy,
-                        RateLimiter rateLimiter,
-                        OpStatsLogger statsLogger) {
-            this.worker = new BkZooWorker(retryPolicy, statsLogger);
-            this.rateLimiter = rateLimiter;
-            that = this;
-        }
-
-        @Override
-        public void run() {
-            if (null != rateLimiter) {
-                rateLimiter.acquire();
-            }
-            zkRun();
-        }
-
-        abstract void zkRun();
-    }
-
-    // inherits from ZooKeeper client for all operations
-
-    @Override
-    public long getSessionId() {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return super.getSessionId();
-        }
-        return zkHandle.getSessionId();
-    }
-
-    @Override
-    public byte[] getSessionPasswd() {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return super.getSessionPasswd();
-        }
-        return zkHandle.getSessionPasswd();
-    }
-
-    @Override
-    public int getSessionTimeout() {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return super.getSessionTimeout();
-        }
-        return zkHandle.getSessionTimeout();
-    }
-
-    @Override
-    public void addAuthInfo(String scheme, byte[] auth) {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            super.addAuthInfo(scheme, auth);
-            return;
-        }
-        zkHandle.addAuthInfo(scheme, auth);
-    }
-
-    private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) {
-        try {
-            retryExecutor.schedule(r, nextRetryWaitTimeMs, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            if (!closed.get()) {
-                logger.error("ZooKeeper Operation {} is rejected : ", r, ree);
-            }
-        }
-    }
-
-    private boolean allowRetry(BkZooWorker worker, int rc) {
-        return worker.allowRetry(rc) && !closed.get();
-    }
-
-    @Override
-    public synchronized void register(Watcher watcher) {
-        watcherManager.addChildWatcher(watcher);
-    }
-
-    @Override
-    public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<OpResult>>() {
-
-            @Override
-            public String toString() {
-                return "multi";
-            }
-
-            @Override
-            public List<OpResult> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.multi(ops);
-                }
-                return zkHandle.multi(ops);
-            }
-
-        }, operationRetryPolicy, rateLimiter, multiStats);
-    }
-
-    @Override
-    public void multi(final Iterable<Op> ops,
-                      final MultiCallback cb,
-                      final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
-
-            final MultiCallback multiCb = new MultiCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, results);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.multi(ops, multiCb, worker);
-                } else {
-                    zkHandle.multi(ops, multiCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return "multi";
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    @Deprecated
-    public Transaction transaction() {
-        // since there is no reference about which client that the transaction could use
-        // so just use ZooKeeper instance directly.
-        // you'd better to use {@link #multi}.
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return super.transaction();
-        }
-        return zkHandle.transaction();
-    }
-
-    @Override
-    public List<ACL> getACL(final String path, final Stat stat) throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<ACL>>() {
-
-            @Override
-            public String toString() {
-                return String.format("getACL (%s, stat = %s)", path, stat);
-            }
-
-            @Override
-            public List<ACL> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getACL(path, stat);
-                }
-                return zkHandle.getACL(path, stat);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getACLStats);
-    }
-
-    @Override
-    public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getACLStats) {
-
-            final ACLCallback aclCb = new ACLCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, acl, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            public String toString() {
-                return String.format("getACL (%s, stat = %s)", path, stat);
-            }
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getACL(path, stat, aclCb, worker);
-                } else {
-                    zkHandle.getACL(path, stat, aclCb, worker);
-                }
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public Stat setACL(final String path, final List<ACL> acl, final int version)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
-
-            @Override
-            public String toString() {
-                return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
-            }
-
-            @Override
-            public Stat call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.setACL(path, acl, version);
-                }
-                return zkHandle.setACL(path, acl, version);
-            }
-
-        }, operationRetryPolicy, rateLimiter, setACLStats);
-    }
-
-    @Override
-    public void setACL(final String path, final List<ACL> acl, final int version,
-            final StatCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setACLStats) {
-
-            final StatCallback stCb = new StatCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            public String toString() {
-                return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
-            }
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.setACL(path, acl, version, stCb, worker);
-                } else {
-                    zkHandle.setACL(path, acl, version, stCb, worker);
-                }
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void sync(final String path, final VoidCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, syncStats) {
-
-            final VoidCallback vCb = new VoidCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context);
-                    }
-                }
-
-            };
-
-            @Override
-            public String toString() {
-                return String.format("sync (%s)", path);
-            }
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.sync(path, vCb, worker);
-                } else {
-                    zkHandle.sync(path, vCb, worker);
-                }
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public States getState() {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return BkZooKeeperClient.super.getState();
-        } else {
-            return zkHandle.getState();
-        }
-    }
-
-    @Override
-    public String toString() {
-        ZooKeeper zkHandle = zk.get();
-        if (null == zkHandle) {
-            return BkZooKeeperClient.super.toString();
-        } else {
-            return zkHandle.toString();
-        }
-    }
-
-    @Override
-    public String create(final String path, final byte[] data,
-            final List<ACL> acl, final CreateMode createMode)
-                    throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
-
-            @Override
-            public String call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.create(path, data, acl, createMode);
-                }
-                return zkHandle.create(path, data, acl, createMode);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-
-        }, operationRetryPolicy, rateLimiter, createStats);
-    }
-
-    @Override
-    public void create(final String path, final byte[] data, final List<ACL> acl,
-            final CreateMode createMode, final StringCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
-
-            final StringCallback createCb = new StringCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, String name) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, name);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
-                } else {
-                    zkHandle.create(path, data, acl, createMode, createCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void delete(final String path, final int version) throws KeeperException, InterruptedException {
-        BkZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
-
-            @Override
-            public Void call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.delete(path, version);
-                } else {
-                    zkHandle.delete(path, version);
-                }
-                return null;
-            }
-
-            @Override
-            public String toString() {
-                return String.format("delete (%s, version = %d)", path, version);
-            }
-
-        }, operationRetryPolicy, rateLimiter, deleteStats);
-    }
-
-    @Override
-    public void delete(final String path, final int version, final VoidCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, deleteStats) {
-
-            final VoidCallback deleteCb = new VoidCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.delete(path, version, deleteCb, worker);
-                } else {
-                    zkHandle.delete(path, version, deleteCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("delete (%s, version = %d)", path, version);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
-
-            @Override
-            public Stat call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.exists(path, watcher);
-                }
-                return zkHandle.exists(path, watcher);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("exists (%s, watcher = %s)", path, watcher);
-            }
-
-        }, operationRetryPolicy, rateLimiter, existsStats);
-    }
-
-    @Override
-    public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
-
-            @Override
-            public Stat call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.exists(path, watch);
-                }
-                return zkHandle.exists(path, watch);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("exists (%s, watcher = %s)", path, watch);
-            }
-
-        }, operationRetryPolicy, rateLimiter, existsStats);
-    }
-
-    @Override
-    public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
-
-            final StatCallback stCb = new StatCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.exists(path, watcher, stCb, worker);
-                } else {
-                    zkHandle.exists(path, watcher, stCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("exists (%s, watcher = %s)", path, watcher);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
-
-            final StatCallback stCb = new StatCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.exists(path, watch, stCb, worker);
-                } else {
-                    zkHandle.exists(path, watch, stCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("exists (%s, watcher = %s)", path, watch);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public byte[] getData(final String path, final Watcher watcher, final Stat stat)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
-
-            @Override
-            public byte[] call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getData(path, watcher, stat);
-                }
-                return zkHandle.getData(path, watcher, stat);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getData (%s, watcher = %s)", path, watcher);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getStats);
-    }
-
-    @Override
-    public byte[] getData(final String path, final boolean watch, final Stat stat)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
-
-            @Override
-            public byte[] call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getData(path, watch, stat);
-                }
-                return zkHandle.getData(path, watch, stat);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getData (%s, watcher = %s)", path, watch);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getStats);
-    }
-
-    @Override
-    public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
-
-            final DataCallback dataCb = new DataCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, data, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getData(path, watcher, dataCb, worker);
-                } else {
-                    zkHandle.getData(path, watcher, dataCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getData (%s, watcher = %s)", path, watcher);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
-
-            final DataCallback dataCb = new DataCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, data, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getData(path, watch, dataCb, worker);
-                } else {
-                    zkHandle.getData(path, watch, dataCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getData (%s, watcher = %s)", path, watch);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public Stat setData(final String path, final byte[] data, final int version)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
-
-            @Override
-            public Stat call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.setData(path, data, version);
-                }
-                return zkHandle.setData(path, data, version);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("setData (%s, version = %d)", path, version);
-            }
-
-        }, operationRetryPolicy, rateLimiter, setStats);
-    }
-
-    @Override
-    public void setData(final String path, final byte[] data, final int version,
-            final StatCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) {
-
-            final StatCallback stCb = new StatCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.setData(path, data, version, stCb, worker);
-                } else {
-                    zkHandle.setData(path, data, version, stCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("setData (%s, version = %d)", path, version);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public List<String> getChildren(final String path, final Watcher watcher, final Stat stat)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
-
-            @Override
-            public List<String> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getChildren(path, watcher, stat);
-                }
-                return zkHandle.getChildren(path, watcher, stat);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watcher);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getChildrenStats);
-    }
-
-    @Override
-    public List<String> getChildren(final String path, final boolean watch, final Stat stat)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
-
-            @Override
-            public List<String> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getChildren(path, watch, stat);
-                }
-                return zkHandle.getChildren(path, watch, stat);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watch);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getChildrenStats);
-    }
-
-    @Override
-    public void getChildren(final String path, final Watcher watcher,
-            final Children2Callback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
-
-            final Children2Callback childCb = new Children2Callback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx,
-                        List<String> children, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, children, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
-                } else {
-                    zkHandle.getChildren(path, watcher, childCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watcher);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void getChildren(final String path, final boolean watch, final Children2Callback cb,
-            final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
-
-            final Children2Callback childCb = new Children2Callback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx,
-                        List<String> children, Stat stat) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, children, stat);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getChildren(path, watch, childCb, worker);
-                } else {
-                    zkHandle.getChildren(path, watch, childCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watch);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-
-    @Override
-    public List<String> getChildren(final String path, final Watcher watcher)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
-
-            @Override
-            public List<String> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getChildren(path, watcher);
-                }
-                return zkHandle.getChildren(path, watcher);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watcher);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getChildrenStats);
-    }
-
-    @Override
-    public List<String> getChildren(final String path, final boolean watch)
-            throws KeeperException, InterruptedException {
-        return BkZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
-
-            @Override
-            public List<String> call() throws KeeperException, InterruptedException {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    return BkZooKeeperClient.super.getChildren(path, watch);
-                }
-                return zkHandle.getChildren(path, watch);
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watch);
-            }
-
-        }, operationRetryPolicy, rateLimiter, getChildrenStats);
-    }
-
-    @Override
-    public void getChildren(final String path, final Watcher watcher,
-            final ChildrenCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
-
-            final ChildrenCallback childCb = new ChildrenCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx,
-                        List<String> children) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, children);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
-                } else {
-                    zkHandle.getChildren(path, watcher, childCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watcher);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-
-    @Override
-    public void getChildren(final String path, final boolean watch,
-            final ChildrenCallback cb, final Object context) {
-        final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
-
-            final ChildrenCallback childCb = new ChildrenCallback() {
-
-                @Override
-                public void processResult(int rc, String path, Object ctx,
-                        List<String> children) {
-                    BkZooWorker worker = (BkZooWorker) ctx;
-                    if (allowRetry(worker, rc)) {
-                        backOffAndRetry(that, worker.nextRetryWaitTime());
-                    } else {
-                        cb.processResult(rc, path, context, children);
-                    }
-                }
-
-            };
-
-            @Override
-            void zkRun() {
-                ZooKeeper zkHandle = zk.get();
-                if (null == zkHandle) {
-                    BkZooKeeperClient.super.getChildren(path, watch, childCb, worker);
-                } else {
-                    zkHandle.getChildren(path, watch, childCb, worker);
-                }
-            }
-
-            @Override
-            public String toString() {
-                return String.format("getChildren (%s, watcher = %s)", path, watch);
-            }
-        };
-        // execute it immediately
-        proc.run();
-    }
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java b/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
deleted file mode 100644
index 634a61e..0000000
--- a/pulsar-proxy/src/main/java/org/apache/bookkeeper/zookeeper/BkZooWorker.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.bookkeeper.zookeeper;
-
-import com.google.common.util.concurrent.RateLimiter;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide a mechanism to perform an operation on ZooKeeper that is safe on disconnections
- * and recoverable errors.
- *
- * TODO: Remove this class once BK-4.8 is released to include read-only in ZooKeeperClient.
- */
-class BkZooWorker {
-
-    private static final Logger logger = LoggerFactory.getLogger(BkZooWorker.class);
-
-    int attempts = 0;
-    long startTimeNanos;
-    long elapsedTimeMs = 0L;
-    final RetryPolicy retryPolicy;
-    final OpStatsLogger statsLogger;
-
-    BkZooWorker(RetryPolicy retryPolicy, OpStatsLogger statsLogger) {
-        this.retryPolicy = retryPolicy;
-        this.statsLogger = statsLogger;
-        this.startTimeNanos = MathUtils.nowInNano();
-    }
-
-    public boolean allowRetry(int rc) {
-        elapsedTimeMs = MathUtils.elapsedMSec(startTimeNanos);
-        if (!BkZooWorker.isRecoverableException(rc)) {
-            if (KeeperException.Code.OK.intValue() == rc) {
-                statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
-            } else {
-                statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
-            }
-            return false;
-        }
-        ++attempts;
-        return retryPolicy.allowRetry(attempts, elapsedTimeMs);
-    }
-
-    public long nextRetryWaitTime() {
-        return retryPolicy.nextRetryWaitTime(attempts, elapsedTimeMs);
-    }
-
-    /**
-     * Check whether the given result code is recoverable by retry.
-     *
-     * @param rc result code
-     * @return true if given result code is recoverable.
-     */
-    public static boolean isRecoverableException(int rc) {
-        return KeeperException.Code.CONNECTIONLOSS.intValue() == rc
-            || KeeperException.Code.OPERATIONTIMEOUT.intValue() == rc
-            || KeeperException.Code.SESSIONMOVED.intValue() == rc
-            || KeeperException.Code.SESSIONEXPIRED.intValue() == rc;
-    }
-
-    /**
-     * Check whether the given exception is recoverable by retry.
-     *
-     * @param exception given exception
-     * @return true if given exception is recoverable.
-     */
-    public static boolean isRecoverableException(KeeperException exception) {
-        return isRecoverableException(exception.code().intValue());
-    }
-
-    interface ZooCallable<T> {
-        /**
-         * Be compatible with ZooKeeper interface.
-         *
-         * @return value
-         * @throws InterruptedException
-         * @throws KeeperException
-         */
-        T call() throws InterruptedException, KeeperException;
-    }
-
-    /**
-     * Execute a sync zookeeper operation with a given retry policy.
-     *
-     * @param client
-     *          ZooKeeper client.
-     * @param proc
-     *          Synchronous zookeeper operation wrapped in a {@link Callable}.
-     * @param retryPolicy
-     *          Retry policy to execute the synchronous operation.
-     * @param rateLimiter
-     *          Rate limiter for zookeeper calls
-     * @param statsLogger
-     *          Stats Logger for zookeeper client.
-     * @return result of the zookeeper operation
-     * @throws KeeperException any non-recoverable exception or recoverable exception exhausted all retires.
-     * @throws InterruptedException the operation is interrupted.
-     */
-    public static<T> T syncCallWithRetries(BkZooKeeperClient client,
-                                           ZooCallable<T> proc,
-                                           RetryPolicy retryPolicy,
-                                           RateLimiter rateLimiter,
-                                           OpStatsLogger statsLogger)
-    throws KeeperException, InterruptedException {
-        T result = null;
-        boolean isDone = false;
-        int attempts = 0;
-        long startTimeNanos = MathUtils.nowInNano();
-        while (!isDone) {
-            try {
-                if (null != client) {
-                    client.waitForConnection();
-                }
-                logger.debug("Execute {} at {} retry attempt.", proc, attempts);
-                if (null != rateLimiter) {
-                    rateLimiter.acquire();
-                }
-                result = proc.call();
-                isDone = true;
-                statsLogger.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
-            } catch (KeeperException e) {
-                ++attempts;
-                boolean rethrow = true;
-                long elapsedTime = MathUtils.elapsedMSec(startTimeNanos);
-                if (((null != client && isRecoverableException(e)) || null == client)
-                    && retryPolicy.allowRetry(attempts, elapsedTime)) {
-                    rethrow = false;
-                }
-                if (rethrow) {
-                    statsLogger.registerFailedEvent(MathUtils.elapsedMicroSec(startTimeNanos), TimeUnit.MICROSECONDS);
-                    logger.debug("Stopped executing {} after {} attempts.", proc, attempts);
-                    throw e;
-                }
-                TimeUnit.MILLISECONDS.sleep(retryPolicy.nextRetryWaitTime(attempts, elapsedTime));
-            }
-        }
-        return result;
-    }
-
-}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
deleted file mode 100644
index 69452fc..0000000
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/BkReadOnlyZookeeperClientFactoryImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.proxy.server.util;
-
-import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.zookeeper.BkZooKeeperClient;
-import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
-
-@Slf4j
-public class BkReadOnlyZookeeperClientFactoryImpl implements ZooKeeperClientFactory {
-
-    private final OrderedExecutor executor;
-
-    public BkReadOnlyZookeeperClientFactoryImpl(OrderedExecutor executor) {
-        this.executor = executor;
-    }
-
-    @Override
-    public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType, int zkSessionTimeoutMillis) {
-        CompletableFuture<ZooKeeper> future = new CompletableFuture<>();
-
-        executor.execute(safeRun(() -> {
-            try {
-                ZooKeeper zk = BkZooKeeperClient.newBuilder().connectString(serverList)
-                        .sessionTimeoutMs(zkSessionTimeoutMillis)
-                        .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(zkSessionTimeoutMillis,
-                                zkSessionTimeoutMillis, 0))
-                        .allowReadOnlyMode(sessionType == SessionType.AllowReadOnly).build();
-
-                if (zk.getState() == States.CONNECTEDREADONLY && sessionType != SessionType.AllowReadOnly) {
-                    future.completeExceptionally(new IllegalStateException("Cannot use a read-only session"));
-                }
-
-                log.info("ZooKeeper session established: {}", zk);
-                future.complete(zk);
-            } catch (IOException | KeeperException | InterruptedException exception) {
-                log.error("Failed to establish ZooKeeper session: {}", exception.getMessage());
-                future.completeExceptionally(exception);
-            }
-        }, throwable -> {
-            future.completeExceptionally(throwable);
-        }));
-
-        return future;
-    }
-
-}