You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by mm...@apache.org on 2021/10/23 04:21:34 UTC

[bookkeeper] branch master updated: Remove direct ZK access for Auditor (#2842)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 746f9f6  Remove direct ZK access for Auditor (#2842)
746f9f6 is described below

commit 746f9f6ff5f54203a6f1c25b7d0f02642455ea04
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Oct 22 21:21:27 2021 -0700

    Remove direct ZK access for Auditor (#2842)
    
    * Remove direct ZK access for Auditor
    
    * Fixed unused imports
    
    * Fixed checkstyle
    
    * Fixed checkstyle in tests
---
 .../org/apache/bookkeeper/client/BookKeeper.java   |  10 +-
 .../apache/bookkeeper/client/BookKeeperAdmin.java  |  27 +-
 .../bookkeeper/meta/FlatLedgerManagerFactory.java  |   8 +
 .../bookkeeper/meta/LedgerAuditorManager.java      |  52 ++++
 .../bookkeeper/meta/LedgerManagerFactory.java      |  10 +
 .../LegacyHierarchicalLedgerManagerFactory.java    |   8 +
 .../bookkeeper/meta/MSLedgerManagerFactory.java    |   8 +
 .../bookkeeper/meta/ZkLedgerAuditorManager.java    | 279 +++++++++++++++++++++
 .../bookkeeper/replication/AuditorElector.java     | 265 ++++---------------
 .../server/http/BKHttpServiceProvider.java         |  17 +-
 .../server/http/service/WhoIsAuditorService.java   |  13 +-
 .../commands/autorecovery/WhoIsAuditorCommand.java |  52 ++--
 .../bookkeeper/client/BookKeeperTestClient.java    |  12 +-
 .../apache/bookkeeper/client/MockBookKeeper.java   |   5 -
 .../bookkeeper/client/TestBookieWatcher.java       |   2 +-
 .../replication/AuditorRollingRestartTest.java     |   6 +-
 .../replication/AutoRecoveryMainTest.java          |  51 ++--
 .../autorecovery/WhoIsAuditorCommandTest.java      |  13 +-
 18 files changed, 519 insertions(+), 319 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index c34defa..fdefd28 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -81,7 +81,6 @@ import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.MetadataClientDriver;
 import org.apache.bookkeeper.meta.MetadataDrivers;
 import org.apache.bookkeeper.meta.exceptions.MetadataException;
-import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieAddressResolver;
@@ -636,6 +635,11 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
     }
 
     @VisibleForTesting
+    public LedgerManagerFactory getLedgerManagerFactory() {
+        return ledgerManagerFactory;
+    }
+
+    @VisibleForTesting
     LedgerManager getUnderlyingLedgerManager() {
         return ((CleanupLedgerManager) ledgerManager).getUnderlying();
     }
@@ -743,10 +747,6 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
         }
     }
 
-    ZooKeeper getZkHandle() {
-        return ((ZKMetadataClientDriver) metadataDriver).getZk();
-    }
-
     protected ClientConfiguration getConf() {
         return conf;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
index 378df4b..91e2552 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java
@@ -64,6 +64,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.BookieServiceInfo;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -75,7 +76,6 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
-import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.replication.BookieLedgerIndexer;
 import org.apache.bookkeeper.replication.ReplicationException.BKAuditException;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
@@ -113,6 +113,8 @@ public class BookKeeperAdmin implements AutoCloseable {
      */
     private LedgerUnderreplicationManager underreplicationManager;
 
+    private LedgerAuditorManager ledgerAuditorManager;
+
     /**
      * Constructor that takes in a ZooKeeper servers connect string so we know
      * how to connect to ZooKeeper to retrieve information about the BookKeeper
@@ -198,6 +200,14 @@ public class BookKeeperAdmin implements AutoCloseable {
         if (ownsBK) {
             bkc.close();
         }
+
+        if (ledgerAuditorManager != null) {
+            try {
+                ledgerAuditorManager.close();
+            } catch (Exception e) {
+                throw new BKException.MetaStoreException(e);
+            }
+        }
     }
 
     /**
@@ -1401,6 +1411,14 @@ public class BookKeeperAdmin implements AutoCloseable {
         return underreplicationManager;
     }
 
+    private LedgerAuditorManager getLedgerAuditorManager()
+            throws IOException, InterruptedException {
+        if (ledgerAuditorManager == null) {
+            ledgerAuditorManager = mFactory.newLedgerAuditorManager();
+        }
+        return ledgerAuditorManager;
+    }
+
     /**
      * Setter for LostBookieRecoveryDelay value (in seconds) in Zookeeper.
      *
@@ -1452,8 +1470,7 @@ public class BookKeeperAdmin implements AutoCloseable {
             throw new UnavailableException("Autorecovery is disabled. So giving up!");
         }
 
-        BookieId auditorId =
-            AuditorElector.getCurrentAuditor(new ServerConfiguration(bkc.getConf()), bkc.getZkHandle());
+        BookieId auditorId = getLedgerAuditorManager().getCurrentAuditor();
         if (auditorId == null) {
             LOG.error("No auditor elected, though Autorecovery is enabled. So giving up.");
             throw new UnavailableException("No auditor elected, though Autorecovery is enabled. So giving up.");
@@ -1706,4 +1723,8 @@ public class BookKeeperAdmin implements AutoCloseable {
             long ledgerId) {
         return bkc.getBookieClient().getListOfEntriesOfLedger(address, ledgerId);
     }
+
+    public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+        return getLedgerAuditorManager().getCurrentAuditor();
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
index 19ac418..e613082 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
@@ -87,4 +89,10 @@ public class FlatLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
         return new ZkLedgerUnderreplicationManager(conf, zk);
     }
+
+    @Override
+    public LedgerAuditorManager newLedgerAuditorManager() {
+        ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+        return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java
new file mode 100644
index 0000000..b1b2fa0
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerAuditorManager.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.bookkeeper.net.BookieId;
+
+/**
+ * Interface to handle the ledger auditor election.
+ */
+public interface LedgerAuditorManager extends AutoCloseable {
+
+    /**
+     * Events that can be triggered by the LedgerAuditorManager.
+     */
+    enum AuditorEvent {
+        SessionLost,
+        VoteWasDeleted,
+    }
+
+    /**
+     * Try to become the auditor. If there's already another auditor, it will wait until this
+     * current instance has become the auditor.
+     *
+     * @param bookieId the identifier for current bookie
+     * @param listener listener that will receive AuditorEvent notifications
+     * @return
+     */
+    void tryToBecomeAuditor(String bookieId, Consumer<AuditorEvent> listener) throws IOException, InterruptedException;
+
+    /**
+     * Return the information regarding the current auditor.
+     * @return
+     */
+    BookieId getCurrentAuditor() throws IOException, InterruptedException;
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 80d3a65..d213235 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -86,6 +86,16 @@ public interface LedgerManagerFactory extends AutoCloseable {
     LedgerUnderreplicationManager newLedgerUnderreplicationManager()
             throws KeeperException, InterruptedException, ReplicationException.CompatibilityException;
 
+
+    /**
+     * Return a ledger auditor manager, which is used to
+     * coordinate the auto-recovery process.
+     *
+     * @return ledger auditor manager
+     * @see LedgerAuditorManager
+     */
+    LedgerAuditorManager newLedgerAuditorManager() throws IOException, InterruptedException;
+
     /**
      * Format the ledger metadata for LedgerManager.
      *
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
index 9157973..a218ef3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LegacyHierarchicalLedgerManagerFactory.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.ACL;
@@ -79,6 +81,12 @@ public class LegacyHierarchicalLedgerManagerFactory extends AbstractZkLedgerMana
     }
 
     @Override
+    public LedgerAuditorManager newLedgerAuditorManager() {
+        ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+        return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+    }
+
+    @Override
     public LedgerManager newLedgerManager() {
         return new LegacyHierarchicalLedgerManager(conf, zk);
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
index e15e0a5..3ad303a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
@@ -42,6 +42,7 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.AbstractConfiguration;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.metastore.MSException;
 import org.apache.bookkeeper.metastore.MSWatchedEvent;
@@ -60,6 +61,7 @@ import org.apache.bookkeeper.metastore.Value;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.replication.ReplicationException;
+import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.StringUtils;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -815,4 +817,10 @@ public class MSLedgerManagerFactory extends AbstractZkLedgerManagerFactory {
                 zkServers, zkLedgersRootPath);
         return true;
     }
+
+    @Override
+    public LedgerAuditorManager newLedgerAuditorManager() {
+        ServerConfiguration serverConfiguration = new ServerConfiguration(conf);
+        return new ZkLedgerAuditorManager(zk, serverConfiguration, NullStatsLogger.INSTANCE);
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java
new file mode 100644
index 0000000..fa1fcd4
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ZkLedgerAuditorManager.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.meta;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
+import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS;
+import com.google.protobuf.TextFormat;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.net.BookieId;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.annotations.StatsDoc;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+/**
+ * ZK based implementation of LedgerAuditorManager.
+ */
+@Slf4j
+public class ZkLedgerAuditorManager implements LedgerAuditorManager {
+
+    private final ZooKeeper zkc;
+    private final ServerConfiguration conf;
+    private final String basePath;
+    private final String electionPath;
+
+    private String myVote;
+
+    private static final String ELECTION_ZNODE = "auditorelection";
+
+    // Represents the index of the auditor node
+    private static final int AUDITOR_INDEX = 0;
+    // Represents vote prefix
+    private static final String VOTE_PREFIX = "V_";
+    // Represents path Separator
+    private static final String PATH_SEPARATOR = "/";
+
+    private volatile Consumer<AuditorEvent> listener;
+    private volatile boolean isClosed = false;
+
+    // Expose Stats
+    @StatsDoc(
+            name = ELECTION_ATTEMPTS,
+            help = "The number of auditor election attempts"
+    )
+    private final Counter electionAttempts;
+
+    public ZkLedgerAuditorManager(ZooKeeper zkc, ServerConfiguration conf, StatsLogger statsLogger) {
+        this.zkc = zkc;
+        this.conf = conf;
+
+        this.basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
+                + BookKeeperConstants.UNDER_REPLICATION_NODE;
+        this.electionPath = basePath + '/' + ELECTION_ZNODE;
+        this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
+    }
+
+    @Override
+    public void tryToBecomeAuditor(String bookieId, Consumer<AuditorEvent> listener)
+            throws IOException, InterruptedException {
+        this.listener = listener;
+        createElectorPath();
+
+        try {
+            while (!isClosed) {
+                createMyVote(bookieId);
+
+                List<String> children = zkc.getChildren(getVotePath(""), false);
+                if (0 >= children.size()) {
+                    throw new IllegalArgumentException(
+                            "At least one bookie server should present to elect the Auditor!");
+                }
+
+                // sorting in ascending order of sequential number
+                Collections.sort(children, new ElectionComparator());
+                String voteNode = StringUtils.substringAfterLast(myVote, PATH_SEPARATOR);
+
+                if (children.get(AUDITOR_INDEX).equals(voteNode)) {
+                    // We have been elected as the auditor
+                    // update the auditor bookie id in the election path. This is
+                    // done for debugging purpose
+                    AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
+                            .setBookieId(bookieId);
+
+                    zkc.setData(getVotePath(""),
+                            builder.build().toString().getBytes(UTF_8), -1);
+                    return;
+                 } else {
+                    // If not an auditor, will be watching to my predecessor and
+                    // looking the previous node deletion.
+                    int myIndex = children.indexOf(voteNode);
+                    if (myIndex < 0) {
+                        throw new IllegalArgumentException("My vote has disappeared");
+                    }
+
+                    int prevNodeIndex = myIndex - 1;
+
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
+                            + children.get(prevNodeIndex), event -> latch.countDown())) {
+                        // While adding, the previous znode doesn't exists.
+                        // Again going to election.
+                        continue;
+                    }
+
+                    // Wait for the previous auditor in line to be deleted
+                    latch.await();
+                }
+
+                electionAttempts.inc();
+            }
+        } catch (KeeperException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+        String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
+                + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE;
+
+        try {
+            List<String> children = zkc.getChildren(electionRoot, false);
+            Collections.sort(children, new ElectionComparator());
+            if (children.size() < 1) {
+                return null;
+            }
+            String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX);
+            byte[] data = zkc.getData(ledger, false, null);
+
+            AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder();
+            TextFormat.merge(new String(data, UTF_8), builder);
+            AuditorVoteFormat v = builder.build();
+            return BookieId.parse(v.getBookieId());
+        } catch (KeeperException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        log.info("Shutting down AuditorElector");
+        isClosed = true;
+        if (myVote != null) {
+            try {
+                zkc.delete(myVote, -1);
+            } catch (KeeperException.NoNodeException nne) {
+                // Ok
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                log.warn("InterruptedException while deleting myVote: " + myVote,
+                        ie);
+            } catch (KeeperException ke) {
+                log.error("Exception while deleting myVote:" + myVote, ke);
+            }
+        }
+    }
+
+    private void createMyVote(String bookieId) throws IOException, InterruptedException {
+        List<ACL> zkAcls = ZkUtils.getACLs(conf);
+        AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
+                .setBookieId(bookieId);
+
+        try {
+            if (null == myVote || null == zkc.exists(myVote, false)) {
+                myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX),
+                        builder.build().toString().getBytes(UTF_8), zkAcls,
+                        CreateMode.EPHEMERAL_SEQUENTIAL);
+            }
+        } catch (KeeperException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private void createElectorPath() throws IOException {
+        try {
+            List<ACL> zkAcls = ZkUtils.getACLs(conf);
+            if (zkc.exists(basePath, false) == null) {
+                try {
+                    zkc.create(basePath, new byte[0], zkAcls,
+                            CreateMode.PERSISTENT);
+                } catch (KeeperException.NodeExistsException nee) {
+                    // do nothing, someone else could have created it
+                }
+            }
+            if (zkc.exists(getVotePath(""), false) == null) {
+                try {
+                    zkc.create(getVotePath(""), new byte[0],
+                            zkAcls, CreateMode.PERSISTENT);
+                } catch (KeeperException.NodeExistsException nee) {
+                    // do nothing, someone else could have created it
+                }
+            }
+        } catch (KeeperException ke) {
+            throw new IOException("Failed to initialize Auditor Elector", ke);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Failed to initialize Auditor Elector", ie);
+        }
+    }
+
+    private String getVotePath(String vote) {
+        return electionPath + vote;
+    }
+
+    private void handleZkWatch(WatchedEvent event) {
+        if (isClosed) {
+            return;
+        }
+
+        if (event.getState() == Watcher.Event.KeeperState.Expired) {
+            log.error("Lost ZK connection, shutting down");
+
+            listener.accept(AuditorEvent.SessionLost);
+        } else if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
+            listener.accept(AuditorEvent.VoteWasDeleted);
+        }
+    }
+
+    /**
+     * Compare the votes in the ascending order of the sequence number. Vote
+     * format is 'V_sequencenumber', comparator will do sorting based on the
+     * numeric sequence value.
+     */
+    private static class ElectionComparator
+            implements Comparator<String>, Serializable {
+        /**
+         * Return -1 if the first vote is less than second. Return 1 if the
+         * first vote is greater than second. Return 0 if the votes are equal.
+         */
+        @Override
+        public int compare(String vote1, String vote2) {
+            long voteSeqId1 = getVoteSequenceId(vote1);
+            long voteSeqId2 = getVoteSequenceId(vote2);
+            int result = voteSeqId1 < voteSeqId2 ? -1
+                    : (voteSeqId1 > voteSeqId2 ? 1 : 0);
+            return result;
+        }
+
+        private long getVoteSequenceId(String vote) {
+            String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX);
+            return Long.parseLong(voteId);
+        }
+    }
+
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index 7028466..eaab84c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -20,47 +20,25 @@
  */
 package org.apache.bookkeeper.replication;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
-import static org.apache.bookkeeper.replication.ReplicationStats.ELECTION_ATTEMPTS;
-
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.TextFormat;
-
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ZkLayoutManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.annotations.StatsDoc;
-import org.apache.bookkeeper.util.BookKeeperConstants;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,35 +58,18 @@ import org.slf4j.LoggerFactory;
 public class AuditorElector {
     private static final Logger LOG = LoggerFactory
             .getLogger(AuditorElector.class);
-    // Represents the index of the auditor node
-    private static final int AUDITOR_INDEX = 0;
-    // Represents vote prefix
-    private static final String VOTE_PREFIX = "V_";
-    // Represents path Separator
-    private static final String PATH_SEPARATOR = "/";
-    private static final String ELECTION_ZNODE = "auditorelection";
-    // Represents urLedger path in zk
-    private final String basePath;
-    // Represents auditor election path in zk
-    private final String electionPath;
 
     private final String bookieId;
     private final ServerConfiguration conf;
     private final BookKeeper bkc;
-    private final ZooKeeper zkc;
     private final boolean ownBkc;
     private final ExecutorService executor;
+    private final LedgerAuditorManager ledgerAuditorManager;
 
-    private String myVote;
     Auditor auditor;
     private AtomicBoolean running = new AtomicBoolean(false);
 
-    // Expose Stats
-    @StatsDoc(
-        name = ELECTION_ATTEMPTS,
-        help = "The number of auditor election attempts"
-    )
-    private final Counter electionAttempts;
+
     private final StatsLogger statsLogger;
 
 
@@ -163,13 +124,12 @@ public class AuditorElector {
         this.conf = conf;
         this.bkc = bkc;
         this.ownBkc = ownBkc;
-        this.zkc = ((ZkLayoutManager) bkc.getMetadataClientDriver().getLayoutManager()).getZk();
         this.statsLogger = statsLogger;
-        this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
-        basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
-                + BookKeeperConstants.UNDER_REPLICATION_NODE;
-        electionPath = basePath + '/' + ELECTION_ZNODE;
-        createElectorPath();
+        try {
+            this.ledgerAuditorManager = bkc.getLedgerManagerFactory().newLedgerAuditorManager();
+        } catch (Exception e) {
+            throw new UnavailableException("Failed to instantiate the ledger auditor manager", e);
+        }
         executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
                 @Override
                 public Thread newThread(Runnable r) {
@@ -178,70 +138,6 @@ public class AuditorElector {
             });
     }
 
-    private void createMyVote() throws KeeperException, InterruptedException {
-        if (null == myVote || null == zkc.exists(myVote, false)) {
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
-                .setBookieId(bookieId);
-            myVote = zkc.create(getVotePath(PATH_SEPARATOR + VOTE_PREFIX),
-                    builder.build().toString().getBytes(UTF_8), zkAcls,
-                    CreateMode.EPHEMERAL_SEQUENTIAL);
-        }
-    }
-
-    String getMyVote() {
-        return myVote;
-    }
-
-    private String getVotePath(String vote) {
-        return electionPath + vote;
-    }
-
-    private void createElectorPath() throws UnavailableException {
-        try {
-            List<ACL> zkAcls = ZkUtils.getACLs(conf);
-            if (zkc.exists(basePath, false) == null) {
-                try {
-                    zkc.create(basePath, new byte[0], zkAcls,
-                            CreateMode.PERSISTENT);
-                } catch (KeeperException.NodeExistsException nee) {
-                    // do nothing, someone else could have created it
-                }
-            }
-            if (zkc.exists(getVotePath(""), false) == null) {
-                try {
-                    zkc.create(getVotePath(""), new byte[0],
-                            zkAcls, CreateMode.PERSISTENT);
-                } catch (KeeperException.NodeExistsException nee) {
-                    // do nothing, someone else could have created it
-                }
-            }
-        } catch (KeeperException ke) {
-            throw new UnavailableException(
-                    "Failed to initialize Auditor Elector", ke);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-            throw new UnavailableException(
-                    "Failed to initialize Auditor Elector", ie);
-        }
-    }
-
-    /**
-     * Watching the predecessor bookies and will do election on predecessor node
-     * deletion or expiration.
-     */
-    private class ElectionWatcher implements Watcher {
-        @Override
-        public void process(WatchedEvent event) {
-            if (event.getState() == KeeperState.Expired) {
-                LOG.error("Lost ZK connection, shutting down");
-                submitShutdownTask();
-            } else if (event.getType() == EventType.NodeDeleted) {
-                submitElectionTask();
-            }
-        }
-    }
-
     public Future<?> start() {
         running.set(true);
         return submitElectionTask();
@@ -257,17 +153,14 @@ public class AuditorElector {
                     if (!running.compareAndSet(true, false)) {
                         return;
                     }
-                    LOG.info("Shutting down AuditorElector");
-                    if (myVote != null) {
-                        try {
-                            zkc.delete(myVote, -1);
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            LOG.warn("InterruptedException while deleting myVote: " + myVote,
-                                     ie);
-                        } catch (KeeperException ke) {
-                            LOG.error("Exception while deleting myVote:" + myVote, ke);
-                        }
+
+                    try {
+                        ledgerAuditorManager.close();
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        LOG.warn("InterruptedException while closing ledger auditor manager", ie);
+                    } catch (Exception ke) {
+                        LOG.error("Exception while closing ledger auditor manager", ke);
                     }
                 }
             });
@@ -288,59 +181,39 @@ public class AuditorElector {
                         return;
                     }
                     try {
-                        // creating my vote in zk. Vote format is 'V_numeric'
-                        createMyVote();
-                        List<String> children = zkc.getChildren(getVotePath(""), false);
-
-                        if (0 >= children.size()) {
-                            throw new IllegalArgumentException(
-                                    "Atleast one bookie server should present to elect the Auditor!");
-                        }
-
-                        // sorting in ascending order of sequential number
-                        Collections.sort(children, new ElectionComparator());
-                        String voteNode = StringUtils.substringAfterLast(myVote,
-                                                                         PATH_SEPARATOR);
+                        ledgerAuditorManager.tryToBecomeAuditor(bookieId, e -> handleAuditorEvent(e));
 
-                        // starting Auditing service
-                        if (children.get(AUDITOR_INDEX).equals(voteNode)) {
-                            // update the auditor bookie id in the election path. This is
-                            // done for debugging purpose
-                            AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder()
-                                .setBookieId(bookieId);
-
-                            zkc.setData(getVotePath(""),
-                                        builder.build().toString().getBytes(UTF_8), -1);
-                            auditor = new Auditor(bookieId, conf, bkc, false, statsLogger);
-                            auditor.start();
-                        } else {
-                            // If not an auditor, will be watching to my predecessor and
-                            // looking the previous node deletion.
-                            Watcher electionWatcher = new ElectionWatcher();
-                            int myIndex = children.indexOf(voteNode);
-                            int prevNodeIndex = myIndex - 1;
-                            if (null == zkc.exists(getVotePath(PATH_SEPARATOR)
-                                                   + children.get(prevNodeIndex), electionWatcher)) {
-                                // While adding, the previous znode doesn't exists.
-                                // Again going to election.
-                                submitElectionTask();
-                            }
-                            electionAttempts.inc();
-                        }
-                    } catch (KeeperException e) {
-                        LOG.error("Exception while performing auditor election", e);
-                        submitShutdownTask();
+                        auditor = new Auditor(bookieId, conf, bkc, false, statsLogger);
+                        auditor.start();
                     } catch (InterruptedException e) {
                         LOG.error("Interrupted while performing auditor election", e);
                         Thread.currentThread().interrupt();
                         submitShutdownTask();
-                    } catch (UnavailableException e) {
-                        LOG.error("Ledger underreplication manager unavailable during election", e);
+                    } catch (Exception e) {
+                        LOG.error("Exception while performing auditor election", e);
                         submitShutdownTask();
                     }
                 }
             };
-        return executor.submit(r);
+        try {
+            return executor.submit(r);
+        } catch (RejectedExecutionException e) {
+            LOG.debug("Executor was already closed");
+            return CompletableFuture.completedFuture(null);
+        }
+    }
+
+    private void handleAuditorEvent(LedgerAuditorManager.AuditorEvent e) {
+        switch (e) {
+            case SessionLost:
+                LOG.error("Lost ZK connection, shutting down");
+                submitShutdownTask();
+                break;
+
+            case VoteWasDeleted:
+                submitElectionTask();
+                break;
+        }
     }
 
     @VisibleForTesting
@@ -348,33 +221,21 @@ public class AuditorElector {
         return auditor;
     }
 
-    /**
-     * Query zookeeper for the currently elected auditor.
-     * @return the bookie id of the current auditor
-     */
-    public static BookieId getCurrentAuditor(ServerConfiguration conf, ZooKeeper zk)
-            throws KeeperException, InterruptedException, IOException {
-        String electionRoot = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
-            + BookKeeperConstants.UNDER_REPLICATION_NODE + '/' + ELECTION_ZNODE;
-
-        List<String> children = zk.getChildren(electionRoot, false);
-        Collections.sort(children, new AuditorElector.ElectionComparator());
-        if (children.size() < 1) {
-            return null;
-        }
-        String ledger = electionRoot + "/" + children.get(AUDITOR_INDEX);
-        byte[] data = zk.getData(ledger, false, null);
 
-        AuditorVoteFormat.Builder builder = AuditorVoteFormat.newBuilder();
-        TextFormat.merge(new String(data, UTF_8), builder);
-        AuditorVoteFormat v = builder.build();
-        return BookieId.parse(v.getBookieId());
+    public BookieId getCurrentAuditor() throws IOException, InterruptedException {
+        return ledgerAuditorManager.getCurrentAuditor();
     }
 
     /**
      * Shutting down AuditorElector.
      */
     public void shutdown() throws InterruptedException {
+        try {
+            ledgerAuditorManager.close();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
         synchronized (this) {
             if (executor.isShutdown()) {
                 return;
@@ -413,30 +274,4 @@ public class AuditorElector {
     public String toString() {
         return "AuditorElector for " + bookieId;
     }
-
-    /**
-     * Compare the votes in the ascending order of the sequence number. Vote
-     * format is 'V_sequencenumber', comparator will do sorting based on the
-     * numeric sequence value.
-     */
-    private static class ElectionComparator
-        implements Comparator<String>, Serializable {
-        /**
-         * Return -1 if the first vote is less than second. Return 1 if the
-         * first vote is greater than second. Return 0 if the votes are equal.
-         */
-        @Override
-        public int compare(String vote1, String vote2) {
-            long voteSeqId1 = getVoteSequenceId(vote1);
-            long voteSeqId2 = getVoteSequenceId(vote2);
-            int result = voteSeqId1 < voteSeqId2 ? -1
-                    : (voteSeqId1 > voteSeqId2 ? 1 : 0);
-            return result;
-        }
-
-        private long getVoteSequenceId(String vote) {
-            String voteId = StringUtils.substringAfter(vote, VOTE_PREFIX);
-            return Long.parseLong(voteId);
-        }
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
index e9ac414..76705b9 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/BKHttpServiceProvider.java
@@ -36,7 +36,6 @@ import org.apache.bookkeeper.http.HttpServiceProvider;
 import org.apache.bookkeeper.http.service.ErrorHttpService;
 import org.apache.bookkeeper.http.service.HeartbeatService;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.Auditor;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -64,16 +63,12 @@ import org.apache.bookkeeper.server.http.service.TriggerAuditService;
 import org.apache.bookkeeper.server.http.service.TriggerGCService;
 import org.apache.bookkeeper.server.http.service.WhoIsAuditorService;
 import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 
 /**
  * Bookkeeper based implementation of HttpServiceProvider,
  * which provide bookkeeper services to handle http requests
  * from different http endpoints.
- *
- * <p>TODO: eliminate the direct usage of zookeeper here {@link https://github.com/apache/bookkeeper/issues/1332}
  */
 @Slf4j
 public class BKHttpServiceProvider implements HttpServiceProvider {
@@ -82,7 +77,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
     private final BookieServer bookieServer;
     private final AutoRecoveryMain autoRecovery;
     private final ServerConfiguration serverConf;
-    private final ZooKeeper zk;
     private final BookKeeperAdmin bka;
     private final ExecutorService executor;
 
@@ -95,12 +89,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
         this.autoRecovery = autoRecovery;
         this.serverConf = serverConf;
         this.statsProvider = statsProvider;
-        String zkServers = ZKMetadataDriverBase.resolveZkServers(serverConf);
-        this.zk = ZooKeeperClient.newBuilder()
-          .connectString(zkServers)
-          .sessionTimeoutMs(serverConf.getZkTimeout())
-          .build();
-
         ClientConfiguration clientConfiguration = new ClientConfiguration(serverConf);
         this.bka = new BookKeeperAdmin(clientConfiguration);
 
@@ -115,9 +103,6 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
             if (bka != null) {
                 bka.close();
             }
-            if (zk != null) {
-                zk.close();
-            }
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             log.error("Interruption while closing BKHttpServiceProvider", ie);
@@ -236,7 +221,7 @@ public class BKHttpServiceProvider implements HttpServiceProvider {
             case LIST_UNDER_REPLICATED_LEDGER:
                 return new ListUnderReplicatedLedgerService(configuration, bookieServer);
             case WHO_IS_AUDITOR:
-                return new WhoIsAuditorService(configuration, zk);
+                return new WhoIsAuditorService(configuration, bka);
             case TRIGGER_AUDIT:
                 return new TriggerAuditService(configuration, bka);
             case LOST_BOOKIE_RECOVERY_DELAY:
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
index 8bb7824..bafa932 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/WhoIsAuditorService.java
@@ -20,14 +20,13 @@ package org.apache.bookkeeper.server.http.service;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.service.HttpEndpointService;
 import org.apache.bookkeeper.http.service.HttpServiceRequest;
 import org.apache.bookkeeper.http.service.HttpServiceResponse;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.AuditorElector;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,12 +40,12 @@ public class WhoIsAuditorService implements HttpEndpointService {
     static final Logger LOG = LoggerFactory.getLogger(WhoIsAuditorService.class);
 
     protected ServerConfiguration conf;
-    protected ZooKeeper zk;
+    protected BookKeeperAdmin bka;
 
-    public WhoIsAuditorService(ServerConfiguration conf, ZooKeeper zk) {
+    public WhoIsAuditorService(ServerConfiguration conf, BookKeeperAdmin bka) {
         checkNotNull(conf);
         this.conf = conf;
-        this.zk = zk;
+        this.bka = bka;
     }
 
     /*
@@ -57,9 +56,9 @@ public class WhoIsAuditorService implements HttpEndpointService {
         HttpServiceResponse response = new HttpServiceResponse();
 
         if (HttpServer.Method.GET == request.getMethod()) {
-            BookieId bookieId = null;
+            BookieId bookieId;
             try {
-                bookieId = AuditorElector.getCurrentAuditor(conf, zk);
+                bookieId = bka.getCurrentAuditor();
 
                 if (bookieId == null) {
                     response.setCode(HttpServer.StatusCode.NOT_FOUND);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
index 2aa1298..853fca0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommand.java
@@ -21,18 +21,15 @@ package org.apache.bookkeeper.tools.cli.commands.autorecovery;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.io.IOException;
-import java.net.URI;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.tools.cli.helpers.BookieCommand;
 import org.apache.bookkeeper.tools.framework.CliFlags;
 import org.apache.bookkeeper.tools.framework.CliSpec;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,12 +43,19 @@ public class WhoIsAuditorCommand extends BookieCommand<CliFlags> {
     private static final String NAME = "whoisauditor";
     private static final String DESC = "Print the node which holds the auditor lock.";
 
+    private BookKeeperAdmin bka;
+
     public WhoIsAuditorCommand() {
+        this(null);
+    }
+
+    public WhoIsAuditorCommand(BookKeeperAdmin bka) {
         super(CliSpec.newBuilder()
                      .withName(NAME)
                      .withDescription(DESC)
                      .withFlags(new CliFlags())
                      .build());
+        this.bka = bka;
     }
 
     @Override
@@ -64,26 +68,22 @@ public class WhoIsAuditorCommand extends BookieCommand<CliFlags> {
     }
 
     private boolean getAuditor(ServerConfiguration conf)
-        throws ConfigurationException, InterruptedException, IOException, KeeperException {
-        ZooKeeper zk = null;
-        try {
-            String metadataServiceUri = conf.getMetadataServiceUri();
-            String zkServers = ZKMetadataDriverBase.getZKServersFromServiceUri(URI.create(metadataServiceUri));
-            zk = ZooKeeperClient.newBuilder()
-                                .connectString(zkServers)
-                                .sessionTimeoutMs(conf.getZkTimeout())
-                                .build();
-            BookieId bookieId = AuditorElector.getCurrentAuditor(conf, zk);
-            if (bookieId == null) {
-                LOG.info("No auditor elected");
-                return false;
-            }
-            LOG.info("Auditor: " + bookieId);
-        } finally {
-            if (zk != null) {
-                zk.close();
-            }
+            throws BKException, InterruptedException, IOException {
+        ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
+
+        BookieId bookieId;
+        if (this.bka != null) {
+            bookieId = bka.getCurrentAuditor();
+        } else {
+            @Cleanup
+            BookKeeperAdmin bka = new BookKeeperAdmin(clientConfiguration);
+            bookieId = bka.getCurrentAuditor();
+        }
+        if (bookieId == null) {
+            LOG.info("No auditor elected");
+            return false;
         }
+        LOG.info("Auditor: " + bookieId);
         return true;
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
index e361294..c692e1e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java
@@ -31,6 +31,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
+import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -53,13 +54,20 @@ public class BookKeeperTestClient extends BookKeeper {
         this.statsProvider = statsProvider;
     }
 
+    public BookKeeperTestClient(ClientConfiguration conf, ZooKeeper zkc)
+            throws IOException, InterruptedException, BKException {
+        super(conf, zkc, null, new UnpooledByteBufAllocator(false),
+                NullStatsLogger.INSTANCE, null, null, null);
+        this.statsProvider = statsProvider;
+    }
+
     public BookKeeperTestClient(ClientConfiguration conf)
             throws InterruptedException, BKException, IOException {
-        this(conf, null);
+        this(conf, (TestStatsProvider) null);
     }
 
     public ZooKeeper getZkHandle() {
-        return super.getZkHandle();
+        return ((ZKMetadataClientDriver) metadataDriver).getZk();
     }
 
     public ClientConfiguration getConf() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
index d26b226..75f3d8e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java
@@ -55,11 +55,6 @@ public class MockBookKeeper extends BookKeeper {
     final ZooKeeper zkc;
 
     @Override
-    public ZooKeeper getZkHandle() {
-        return zkc;
-    }
-
-    @Override
     public ClientConfiguration getConf() {
         return super.getConf();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
index 9b5853a..ce115a6 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBookieWatcher.java
@@ -160,7 +160,7 @@ public class TestBookieWatcher extends BookKeeperClusterTestCase {
         ClientConfiguration conf = new ClientConfiguration();
         conf.setMetadataServiceUri(metadataServiceUri);
 
-        try (BookKeeper bkc = new BookKeeper(conf, zk)) {
+        try (BookKeeperTestClient bkc = new BookKeeperTestClient(conf, zk)) {
 
             LedgerHandle lh;
             try {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index b1210b7..13ee214 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -24,9 +24,11 @@ import static org.apache.bookkeeper.meta.MetadataDrivers.runFunctionWithLedgerMa
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerAuditorManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieId;
@@ -77,7 +79,9 @@ public class AuditorRollingRestartTest extends BookKeeperClusterTestCase {
                      underReplicationManager.pollLedgerToRereplicate(), -1);
         underReplicationManager.disableLedgerReplication();
 
-        BookieId auditor = AuditorElector.getCurrentAuditor(baseConf, zkc);
+        @Cleanup
+        LedgerAuditorManager lam = mFactory.newLedgerAuditorManager();
+        BookieId auditor = lam.getCurrentAuditor();
         ServerConfiguration conf = killBookie(auditor);
         Thread.sleep(2000);
         startBookie(conf);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
index 2ed92b2..0c2221a 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java
@@ -21,13 +21,13 @@
 package org.apache.bookkeeper.replication;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import java.io.IOException;
 import org.apache.bookkeeper.bookie.BookieImpl;
 import org.apache.bookkeeper.meta.zk.ZKMetadataClientDriver;
 import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 
@@ -97,9 +97,22 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
          */
         ZKMetadataClientDriver zkMetadataClientDriver1 = startAutoRecoveryMain(main1);
         ZooKeeper zk1 = zkMetadataClientDriver1.getZk();
-        Auditor auditor1 = main1.auditorElector.getAuditor();
 
-        BookieId currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(0), zk1);
+        // Wait until auditor gets elected
+        for (int i = 0; i < 10; i++) {
+            try {
+                if (main1.auditorElector.getCurrentAuditor() != null) {
+                    break;
+                } else {
+                    Thread.sleep(1000);
+                }
+            } catch (IOException e) {
+                Thread.sleep(1000);
+            }
+        }
+        BookieId currentAuditor = main1.auditorElector.getCurrentAuditor();
+        assertNotNull(currentAuditor);
+        Auditor auditor1 = main1.auditorElector.getAuditor();
         assertTrue("Current Auditor should be AR1", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(0))));
         assertTrue("Auditor of AR1 should be running", auditor1.isRunning());
 
@@ -142,18 +155,9 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
         }
 
         /*
-         * since zk1 and zk2 sessions are expired, the 'myVote' ephemeral nodes
-         * of AR1 and AR2 should not be existing anymore.
-         */
-        assertTrue("AR1's vote node should not be existing",
-                zk3.exists(main1.auditorElector.getMyVote(), false) == null);
-        assertTrue("AR2's vote node should not be existing",
-                zk3.exists(main2.auditorElector.getMyVote(), false) == null);
-
-        /*
          * the AR3 should be current auditor.
          */
-        currentAuditor = AuditorElector.getCurrentAuditor(confByIndex(2), zk3);
+        currentAuditor = main3.auditorElector.getCurrentAuditor();
         assertTrue("Current Auditor should be AR3", currentAuditor.equals(BookieImpl.getBookieId(confByIndex(2))));
         auditor3 = main3.auditorElector.getAuditor();
         assertTrue("Auditor of AR3 should be running", auditor3.isRunning());
@@ -180,29 +184,12 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase {
      * start autoRecoveryMain and make sure all its components are running and
      * myVote node is existing
      */
-    ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain)
-            throws InterruptedException, KeeperException, UnavailableException {
+    ZKMetadataClientDriver startAutoRecoveryMain(AutoRecoveryMain autoRecoveryMain) {
         autoRecoveryMain.start();
         ZKMetadataClientDriver metadataClientDriver = (ZKMetadataClientDriver) autoRecoveryMain.bkc
                 .getMetadataClientDriver();
-        ZooKeeper zk = metadataClientDriver.getZk();
-        String myVote;
-        for (int i = 0; i < 10; i++) {
-            if (autoRecoveryMain.auditorElector.isRunning() && autoRecoveryMain.replicationWorker.isRunning()
-                    && autoRecoveryMain.isAutoRecoveryRunning()) {
-                myVote = autoRecoveryMain.auditorElector.getMyVote();
-                if (myVote != null) {
-                    if (null != zk.exists(myVote, false)) {
-                        break;
-                    }
-                }
-            }
-            Thread.sleep(100);
-        }
         assertTrue("autoRecoveryMain components should be running", autoRecoveryMain.auditorElector.isRunning()
                 && autoRecoveryMain.replicationWorker.isRunning() && autoRecoveryMain.isAutoRecoveryRunning());
-        assertTrue("autoRecoveryMain's vote node should be existing",
-                zk.exists(autoRecoveryMain.auditorElector.getMyVote(), false) != null);
         return metadataClientDriver;
     }
 }
diff --git a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
index d21856a..f0e74fc 100644
--- a/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
+++ b/tools/ledger/src/test/java/org/apache/bookkeeper/tools/cli/commands/autorecovery/WhoIsAuditorCommandTest.java
@@ -27,6 +27,8 @@ import static org.powermock.api.mockito.PowerMockito.when;
 
 import java.net.URI;
 import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieId;
@@ -75,10 +77,6 @@ public class WhoIsAuditorCommandTest extends BookieCommandTestBase {
 
         BookieId bookieId = BookieId.parse(UUID.randomUUID().toString());
 
-        PowerMockito.mockStatic(AuditorElector.class);
-        PowerMockito.when(AuditorElector.getCurrentAuditor(eq(conf), eq(zk)))
-                    .thenReturn(bookieId);
-
         PowerMockito.mockStatic(CommandHelpers.class);
         PowerMockito.when(CommandHelpers
                 .getBookieSocketAddrStringRepresentation(
@@ -86,8 +84,11 @@ public class WhoIsAuditorCommandTest extends BookieCommandTestBase {
     }
 
     @Test
-    public void testCommand() {
-        WhoIsAuditorCommand cmd = new WhoIsAuditorCommand();
+    public void testCommand() throws Exception {
+        @Cleanup
+        BookKeeperAdmin bka = mock(BookKeeperAdmin.class);
+        when(bka.getCurrentAuditor()).thenReturn(BookieId.parse("127.0.0.1:3181"));
+        WhoIsAuditorCommand cmd = new WhoIsAuditorCommand(bka);
         Assert.assertTrue(cmd.apply(bkFlags, new String[] { "" }));
     }
 }