You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/09/26 09:42:56 UTC

[bookkeeper] branch master updated: autorecovery-use-metadata-driver (part 3) : remove zookeeper from Auditor class

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 2837f62  autorecovery-use-metadata-driver (part 3) : remove zookeeper from Auditor class
2837f62 is described below

commit 2837f6257baf15dc9dd9eb4bcac34596b442be33
Author: Qi Wang <42...@users.noreply.github.com>
AuthorDate: Wed Sep 26 02:42:51 2018 -0700

    autorecovery-use-metadata-driver (part 3) : remove zookeeper from Auditor class
    
    Descriptions of the changes in this PR:
    
    This is the 3rd change to remove zookeeper from Auditor. It also changes how
    we construct metadata driver from AutoRecoveryMain. Since we are using bookkeeper client
    and admin across auditor and replication. So change the AutoRecoveryMain to construct
    the bookkeeper client from client configuration and pass bookkeeper client down to
    replication worker and auditor.
    
    
    
    
    Author: Qi Wang <co...@gmail.com>
    Author: Qi Wang <42...@users.noreply.github.com>
    Author: Sijie Guo <gu...@gmail.com>
    Author: Charan Reddy Guttapalem <re...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1695 from codingwangqi/autorecovery_metadata_part3
---
 .../org/apache/bookkeeper/replication/Auditor.java | 79 +++++++++++++++-------
 .../bookkeeper/replication/AuditorElector.java     | 50 +++++++++++---
 .../bookkeeper/replication/AutoRecoveryMain.java   | 53 ++++++---------
 .../bookkeeper/replication/ReplicationWorker.java  | 38 ++++++-----
 .../bookkeeper/replication/AuditorBookieTest.java  |  9 +--
 .../replication/AuditorLedgerCheckerTest.java      |  3 +-
 .../AuditorPeriodicBookieCheckTest.java            | 13 +---
 .../replication/AuditorPeriodicCheckTest.java      | 18 +----
 .../bookkeeper/server/http/TestHttpService.java    | 11 +--
 9 files changed, 142 insertions(+), 132 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
index 788aaa2..c22889c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java
@@ -52,7 +52,6 @@ import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
@@ -62,11 +61,9 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +79,8 @@ import org.slf4j.LoggerFactory;
 public class Auditor implements AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(Auditor.class);
     private final ServerConfiguration conf;
-    private BookKeeper bkc;
+    private final BookKeeper bkc;
+    private final boolean ownBkc;
     private BookKeeperAdmin admin;
     private BookieLedgerIndexer bookieLedgerIndexer;
     private LedgerManager ledgerManager;
@@ -104,8 +102,48 @@ public class Auditor implements AutoCloseable {
     private Set<String> bookiesToBeAudited = Sets.newHashSet();
     private volatile int lostBookieRecoveryDelayBeforeChange;
 
-    public Auditor(final String bookieIdentifier, ServerConfiguration conf,
-                   ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
+    static BookKeeper createBookKeeperClient(ServerConfiguration conf)
+            throws InterruptedException, IOException {
+        ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
+        clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
+        try {
+            return BookKeeper.forConfig(clientConfiguration).build();
+        } catch (BKException e) {
+            throw new IOException("Failed to create bookkeeper client", e);
+        }
+    }
+
+    static BookKeeper createBookKeeperClientThrowUnavailableException(ServerConfiguration conf)
+        throws UnavailableException {
+        try {
+            return createBookKeeperClient(conf);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new UnavailableException("Failed to create bookkeeper client", e);
+        } catch (IOException e) {
+            throw new UnavailableException("Failed to create bookkeeper client", e);
+        }
+    }
+
+
+    public Auditor(final String bookieIdentifier,
+                   ServerConfiguration conf,
+                   StatsLogger statsLogger)
+        throws UnavailableException {
+        this(
+            bookieIdentifier,
+            conf,
+            createBookKeeperClientThrowUnavailableException(conf),
+            true,
+            statsLogger);
+    }
+
+    public Auditor(final String bookieIdentifier,
+                   ServerConfiguration conf,
+                   BookKeeper bkc,
+                   boolean ownBkc,
+                   StatsLogger statsLogger)
+        throws UnavailableException {
         this.conf = conf;
         this.bookieIdentifier = bookieIdentifier;
         this.statsLogger = statsLogger;
@@ -123,7 +161,9 @@ public class Auditor implements AutoCloseable {
         numDelayedBookieAuditsCancelled = this.statsLogger
                 .getCounter(ReplicationStats.NUM_DELAYED_BOOKIE_AUDITS_DELAYES_CANCELLED);
 
-        initialize(conf, zkc);
+        this.bkc = bkc;
+        this.ownBkc = ownBkc;
+        initialize(conf, bkc);
 
         executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
                 @Override
@@ -135,15 +175,9 @@ public class Auditor implements AutoCloseable {
             });
     }
 
-    private void initialize(ServerConfiguration conf, ZooKeeper zkc)
+    private void initialize(ServerConfiguration conf, BookKeeper bkc)
             throws UnavailableException {
         try {
-            ClientConfiguration clientConfiguration = new ClientConfiguration(conf);
-            clientConfiguration.setClientRole(ClientConfiguration.CLIENT_ROLE_SYSTEM);
-            LOG.info("AuthProvider used by the Auditor is {}",
-                clientConfiguration.getClientAuthProviderFactoryClass());
-            this.bkc = new BookKeeper(clientConfiguration, zkc);
-
             LedgerManagerFactory ledgerManagerFactory = AbstractZkLedgerManagerFactory
                     .newLedgerManagerFactory(
                         conf,
@@ -154,6 +188,8 @@ public class Auditor implements AutoCloseable {
             this.ledgerUnderreplicationManager = ledgerManagerFactory
                     .newLedgerUnderreplicationManager();
             this.admin = new BookKeeperAdmin(bkc, statsLogger);
+            LOG.info("AuthProvider used by the Auditor is {}",
+                admin.getConf().getClientAuthProviderFactoryClass());
             if (this.ledgerUnderreplicationManager
                     .initializeLostBookieRecoveryDelay(conf.getLostBookieRecoveryDelay())) {
                 LOG.info("Initializing lostBookieRecoveryDelay zNode to the conif value: {}",
@@ -166,7 +202,7 @@ public class Auditor implements AutoCloseable {
         } catch (CompatibilityException ce) {
             throw new UnavailableException(
                     "CompatibilityException while initializing Auditor", ce);
-        } catch (IOException | BKException | KeeperException ioe) {
+        } catch (IOException | KeeperException ioe) {
             throw new UnavailableException(
                     "Exception while initializing Auditor", ioe);
         } catch (InterruptedException ie) {
@@ -601,13 +637,7 @@ public class Auditor implements AutoCloseable {
      * be run very often.
      */
     void checkAllLedgers() throws BKException, IOException, InterruptedException, KeeperException {
-        ZooKeeper newzk = ZooKeeperClient.newBuilder()
-                .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
-                .sessionTimeoutMs(conf.getZkTimeout())
-                .build();
-
-        final BookKeeper client = new BookKeeper(new ClientConfiguration(conf),
-                                                 newzk);
+        final BookKeeper client = createBookKeeperClient(conf);
         final BookKeeperAdmin admin = new BookKeeperAdmin(client, statsLogger);
 
         try {
@@ -664,7 +694,6 @@ public class Auditor implements AutoCloseable {
         } finally {
             admin.close();
             client.close();
-            newzk.close();
         }
     }
 
@@ -680,7 +709,9 @@ public class Auditor implements AutoCloseable {
                 executor.shutdownNow();
             }
             admin.close();
-            bkc.close();
+            if (ownBkc) {
+                bkc.close();
+            }
         } catch (InterruptedException ie) {
             Thread.currentThread().interrupt();
             LOG.warn("Interrupted while shutting down auditor bookie", ie);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
index 1ea179c..cddb1f0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AuditorElector.java
@@ -37,7 +37,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.ZkLayoutManager;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.DataFormats.AuditorVoteFormat;
@@ -85,7 +88,9 @@ public class AuditorElector {
 
     private final String bookieId;
     private final ServerConfiguration conf;
+    private final BookKeeper bkc;
     private final ZooKeeper zkc;
+    private final boolean ownBkc;
     private final ExecutorService executor;
 
     private String myVote;
@@ -97,6 +102,15 @@ public class AuditorElector {
     private final StatsLogger statsLogger;
 
 
+    @VisibleForTesting
+    public AuditorElector(final String bookieId, ServerConfiguration conf) throws UnavailableException {
+        this(
+            bookieId,
+            conf,
+            Auditor.createBookKeeperClientThrowUnavailableException(conf),
+            true);
+    }
+
     /**
      * AuditorElector for performing the auditor election.
      *
@@ -104,14 +118,16 @@ public class AuditorElector {
      *            - bookie identifier, comprises HostAddress:Port
      * @param conf
      *            - configuration
-     * @param zkc
-     *            - ZK instance
+     * @param bkc
+     *            - bookkeeper instance
      * @throws UnavailableException
      *             throws unavailable exception while initializing the elector
      */
-    public AuditorElector(final String bookieId, ServerConfiguration conf,
-                          ZooKeeper zkc) throws UnavailableException {
-        this(bookieId, conf, zkc, NullStatsLogger.INSTANCE);
+    public AuditorElector(final String bookieId,
+                          ServerConfiguration conf,
+                          BookKeeper bkc,
+                          boolean ownBkc) throws UnavailableException {
+        this(bookieId, conf, bkc, NullStatsLogger.INSTANCE, ownBkc);
     }
 
     /**
@@ -121,18 +137,23 @@ public class AuditorElector {
      *            - bookie identifier, comprises HostAddress:Port
      * @param conf
      *            - configuration
-     * @param zkc
-     *            - ZK instance
+     * @param bkc
+     *            - bookkeeper instance
      * @param statsLogger
      *            - stats logger
      * @throws UnavailableException
      *             throws unavailable exception while initializing the elector
      */
-    public AuditorElector(final String bookieId, ServerConfiguration conf,
-                          ZooKeeper zkc, StatsLogger statsLogger) throws UnavailableException {
+    public AuditorElector(final String bookieId,
+                          ServerConfiguration conf,
+                          BookKeeper bkc,
+                          StatsLogger statsLogger,
+                          boolean ownBkc) throws UnavailableException {
         this.bookieId = bookieId;
         this.conf = conf;
-        this.zkc = zkc;
+        this.bkc = bkc;
+        this.ownBkc = ownBkc;
+        this.zkc = ((ZkLayoutManager) bkc.getMetadataClientDriver().getLayoutManager()).getZk();
         this.statsLogger = statsLogger;
         this.electionAttempts = statsLogger.getCounter(ELECTION_ATTEMPTS);
         basePath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf) + '/'
@@ -274,7 +295,7 @@ public class AuditorElector {
 
                             zkc.setData(getVotePath(""),
                                         TextFormat.printToString(builder.build()).getBytes(UTF_8), -1);
-                            auditor = new Auditor(bookieId, conf, zkc, statsLogger);
+                            auditor = new Auditor(bookieId, conf, bkc, false, statsLogger);
                             auditor.start();
                         } else {
                             // If not an auditor, will be watching to my predecessor and
@@ -352,6 +373,13 @@ public class AuditorElector {
             auditor.shutdown();
             auditor = null;
         }
+        if (ownBkc) {
+            try {
+                bkc.close();
+            } catch (BKException e) {
+                LOG.warn("Failed to close bookkeeper client", e);
+            }
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index b43ff85..d63e19e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -20,7 +20,6 @@
  */
 package org.apache.bookkeeper.replication;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
 import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE;
 
@@ -29,18 +28,15 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.net.URI;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.ExitCode;
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.http.HttpServer;
 import org.apache.bookkeeper.http.HttpServerLoader;
-import org.apache.bookkeeper.meta.MetadataBookieDriver;
-import org.apache.bookkeeper.meta.MetadataDrivers;
-import org.apache.bookkeeper.meta.exceptions.MetadataException;
-import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
@@ -53,7 +49,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +62,7 @@ public class AutoRecoveryMain {
             .getLogger(AutoRecoveryMain.class);
 
     private final ServerConfiguration conf;
-    final MetadataBookieDriver metadataBookieDriver;
+    final BookKeeper bkc;
     final AuditorElector auditorElector;
     final ReplicationWorker replicationWorker;
     final AutoRecoveryDeathWatcher deathWatcher;
@@ -85,37 +80,22 @@ public class AutoRecoveryMain {
             throws IOException, InterruptedException, KeeperException, UnavailableException,
             CompatibilityException {
         this.conf = conf;
-        this.metadataBookieDriver = initializeMetadataDriver(conf, statsLogger);
+        this.bkc = Auditor.createBookKeeperClient(conf);
 
-        auditorElector = new AuditorElector(Bookie.getBookieAddress(conf).toString(), conf,
-            getZooKeeperFromMetadataDriver(metadataBookieDriver),
-            statsLogger.scope(AUDITOR_SCOPE));
+        auditorElector = new AuditorElector(
+            Bookie.getBookieAddress(conf).toString(),
+            conf,
+            bkc,
+            statsLogger.scope(AUDITOR_SCOPE),
+            false);
         replicationWorker = new ReplicationWorker(
             conf,
+            bkc,
+            false,
             statsLogger.scope(REPLICATION_WORKER_SCOPE));
         deathWatcher = new AutoRecoveryDeathWatcher(this);
     }
 
-    private MetadataBookieDriver initializeMetadataDriver(ServerConfiguration conf, StatsLogger statsLogger)
-            throws IOException {
-        String metadataServiceUri = conf.getMetadataServiceUriUnchecked();
-        MetadataBookieDriver driver = MetadataDrivers.getBookieDriver(
-            URI.create(metadataServiceUri));
-        try {
-            driver.initialize(conf, () -> {}, statsLogger);
-        } catch (MetadataException e) {
-            throw new IOException("Failed to initialize metadata driver at " + metadataServiceUri, e);
-        }
-        return driver;
-    }
-
-    // it existing because AuditorElector takes zookeeper
-    ZooKeeper getZooKeeperFromMetadataDriver(MetadataBookieDriver driver) {
-        checkArgument(driver instanceof ZKMetadataBookieDriver);
-        ZKMetadataBookieDriver zkDriver = (ZKMetadataBookieDriver) driver;
-        return zkDriver.getZk();
-    }
-
     /*
      * Start daemons
      */
@@ -164,7 +144,14 @@ public class AutoRecoveryMain {
             LOG.warn("Interrupted shutting down auditor elector", e);
         }
         replicationWorker.shutdown();
-        metadataBookieDriver.close();
+        try {
+            bkc.close();
+        } catch (BKException e) {
+            LOG.warn("Failed to close bookkeeper client for auto recovery", e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.warn("Interrupted closing bookkeeper client for auto recovery", e);
+        }
     }
 
     private int getExitCode() {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
index b536fb0..e3f9ac6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/ReplicationWorker.java
@@ -19,7 +19,6 @@
  */
 package org.apache.bookkeeper.replication;
 
-import static org.apache.bookkeeper.replication.ReplicationStats.BK_CLIENT_SCOPE;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_DEFER_LEDGER_LOCK_RELEASE_OF_FAILED_LEDGER;
 import static org.apache.bookkeeper.replication.ReplicationStats.NUM_FULL_OR_PARTIAL_LEDGERS_REPLICATED;
 import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATE_EXCEPTION;
@@ -53,7 +52,6 @@ import org.apache.bookkeeper.client.LedgerChecker;
 import org.apache.bookkeeper.client.LedgerFragment;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.LedgerMetadata;
-import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
@@ -86,6 +84,7 @@ public class ReplicationWorker implements Runnable {
     private final BookKeeperAdmin admin;
     private final LedgerChecker ledgerChecker;
     private final BookKeeper bkc;
+    private final boolean ownBkc;
     private final Thread workerThread;
     private final long rwRereplicateBackoffMs;
     private final long openLedgerRereplicationGracePeriod;
@@ -127,15 +126,20 @@ public class ReplicationWorker implements Runnable {
     public ReplicationWorker(final ServerConfiguration conf,
                              StatsLogger statsLogger)
             throws CompatibilityException, KeeperException,
+
+            InterruptedException, IOException {
+        this(conf, Auditor.createBookKeeperClient(conf), true, statsLogger);
+    }
+
+    ReplicationWorker(final ServerConfiguration conf,
+                      BookKeeper bkc,
+                      boolean ownBkc,
+                      StatsLogger statsLogger)
+            throws CompatibilityException, KeeperException,
             InterruptedException, IOException {
         this.conf = conf;
-        try {
-            this.bkc = BookKeeper.forConfig(new ClientConfiguration(conf))
-                .statsLogger(statsLogger.scope(BK_CLIENT_SCOPE))
-                .build();
-        } catch (BKException e) {
-            throw new IOException("Failed to instantiate replication worker", e);
-        }
+        this.bkc = bkc;
+        this.ownBkc = ownBkc;
         LedgerManagerFactory mFactory = AbstractZkLedgerManagerFactory
                 .newLedgerManagerFactory(
                     this.conf,
@@ -505,13 +509,15 @@ public class ReplicationWorker implements Runnable {
                     e);
             Thread.currentThread().interrupt();
         }
-        try {
-            bkc.close();
-        } catch (InterruptedException e) {
-            LOG.warn("Interrupted while closing the Bookie client", e);
-            Thread.currentThread().interrupt();
-        } catch (BKException e) {
-            LOG.warn("Exception while closing the Bookie client", e);
+        if (ownBkc) {
+            try {
+                bkc.close();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted while closing the Bookie client", e);
+                Thread.currentThread().interrupt();
+            } catch (BKException e) {
+                LOG.warn("Exception while closing the Bookie client", e);
+            }
         }
         try {
             underreplicationManager.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
index 2587865..f32b787 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorBookieTest.java
@@ -35,7 +35,6 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.ZooKeeper;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -209,14 +208,8 @@ public class AuditorBookieTest extends BookKeeperClusterTestCase {
     }
 
     private void startAuditorElector(String addr) throws Exception {
-        ZooKeeper zk = ZooKeeperClient.newBuilder()
-                .connectString(zkUtil.getZooKeeperConnectString())
-                .sessionTimeoutMs(10000)
-                .build();
-        zkClients.add(zk);
-
         AuditorElector auditorElector = new AuditorElector(addr,
-                                                           baseConf, zk);
+                                                           baseConf);
         auditorElectors.put(addr, auditorElector);
         auditorElector.start();
         LOG.debug("Starting Auditor Elector");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
index 65c6922..a09720b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorLedgerCheckerTest.java
@@ -146,8 +146,7 @@ public class AuditorLedgerCheckerTest extends BookKeeperClusterTestCase {
     private void startAuditorElectors() throws Exception {
         for (BookieServer bserver : bs) {
             String addr = bserver.getLocalAddress().toString();
-            AuditorElector auditorElector = new AuditorElector(addr,
-                    baseConf, zkc);
+            AuditorElector auditorElector = new AuditorElector(addr, baseConf);
             auditorElectors.put(addr, auditorElector);
             auditorElector.start();
             LOG.debug("Starting Auditor Elector");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
index 3ec8ae1..e53b3ff 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicBookieCheckTest.java
@@ -35,12 +35,9 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
-import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,7 +53,6 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
             .getLogger(AuditorPeriodicBookieCheckTest.class);
 
     private AuditorElector auditorElector = null;
-    private ZooKeeper auditorZookeeper = null;
 
     private static final int CHECK_INTERVAL = 1; // run every second
 
@@ -75,13 +71,7 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
         conf.setMetadataServiceUri(metadataServiceUri);
         String addr = bs.get(0).getLocalAddress().toString();
 
-        auditorZookeeper = ZooKeeperClient.newBuilder()
-                .connectString(ZKMetadataDriverBase.resolveZkServers(conf))
-                .sessionTimeoutMs(10000)
-                .build();
-
-        auditorElector = new AuditorElector(addr, conf,
-                auditorZookeeper);
+        auditorElector = new AuditorElector(addr, conf);
         auditorElector.start();
     }
 
@@ -89,7 +79,6 @@ public class AuditorPeriodicBookieCheckTest extends BookKeeperClusterTestCase {
     @Override
     public void tearDown() throws Exception {
         auditorElector.shutdown();
-        auditorZookeeper.close();
 
         super.tearDown();
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
index 0668088..84788a3 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorPeriodicCheckTest.java
@@ -58,8 +58,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.ZooKeeper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,7 +74,6 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
 
     private MetadataBookieDriver driver;
     private HashMap<String, AuditorElector> auditorElectors = new HashMap<String, AuditorElector>();
-    private List<ZooKeeper> zkClients = new LinkedList<ZooKeeper>();
 
     private static final int CHECK_INTERVAL = 1; // run every second
 
@@ -96,14 +93,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
 
             String addr = bs.get(i).getLocalAddress().toString();
 
-            ZooKeeper zk = ZooKeeperClient.newBuilder()
-                    .connectString(zkUtil.getZooKeeperConnectString())
-                    .sessionTimeoutMs(10000)
-                    .build();
-            zkClients.add(zk);
-
-            AuditorElector auditorElector = new AuditorElector(addr,
-                                                               conf, zk);
+            AuditorElector auditorElector = new AuditorElector(addr, conf);
             auditorElectors.put(addr, auditorElector);
             auditorElector.start();
             LOG.debug("Starting Auditor Elector");
@@ -127,10 +117,6 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
         for (AuditorElector e : auditorElectors.values()) {
             e.shutdown();
         }
-        for (ZooKeeper zk : zkClients) {
-            zk.close();
-        }
-        zkClients.clear();
         super.tearDown();
     }
 
@@ -339,7 +325,7 @@ public class AuditorPeriodicCheckTest extends BookKeeperClusterTestCase {
 
         try (final Auditor auditor = new Auditor(
                 Bookie.getBookieAddress(bsConfs.get(0)).toString(),
-                bsConfs.get(0), zkc, NullStatsLogger.INSTANCE)) {
+                bsConfs.get(0), NullStatsLogger.INSTANCE)) {
             final AtomicBoolean exceptionCaught = new AtomicBoolean(false);
             final CountDownLatch latch = new CountDownLatch(1);
             Thread t = new Thread() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
index d14b8a4..d328bd4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/server/http/TestHttpService.java
@@ -51,8 +51,6 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallbackFuture;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.zookeeper.ZooKeeper;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -585,25 +583,18 @@ public class TestHttpService extends BookKeeperClusterTestCase {
         assertEquals(HttpServer.StatusCode.OK.getValue(), response5.getStatusCode());
     }
 
-    ZooKeeper auditorZookeeper;
     AuditorElector auditorElector;
     private Future<?> startAuditorElector() throws Exception {
-        auditorZookeeper = ZooKeeperClient.newBuilder()
-          .connectString(zkUtil.getZooKeeperConnectString())
-          .sessionTimeoutMs(10000)
-          .build();
         String addr = bs.get(0).getLocalAddress().toString();
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setAuditorPeriodicBookieCheckInterval(1);
         conf.setMetadataServiceUri("zk://" + zkUtil.getZooKeeperConnectString() + "/ledgers");
-        auditorElector = new AuditorElector(addr, conf,
-          auditorZookeeper);
+        auditorElector = new AuditorElector(addr, conf);
         return auditorElector.start();
     }
 
     private void stopAuditorElector() throws Exception {
         auditorElector.shutdown();
-        auditorZookeeper.close();
     }
 
     @Test