You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/29 03:42:07 UTC

[storm] branch master updated: STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 27921e2  STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down
     new 7e98f29  Merge branch 'STORM-3354' of https://github.com/srdo/storm into STORM-3354-merge
27921e2 is described below

commit 27921e208a1373f4f3d8b1639693510e23cb593b
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Thu Mar 7 12:36:03 2019 +0100

    STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down
---
 .../storm/messaging/netty/StormServerHandler.java  |  2 +-
 .../org/apache/storm/nimbus/ILeaderElector.java    | 17 +++--
 .../src/jvm/org/apache/storm/utils/Utils.java      | 27 ++++----
 .../apache/storm/testing/MockLeaderElector.java    |  2 +-
 .../apache/storm/blobstore/LocalFsBlobStore.java   |  2 +-
 .../org/apache/storm/daemon/nimbus/Nimbus.java     |  6 +-
 .../storm/nimbus/LeaderListenerCallback.java       | 26 ++++---
 .../apache/storm/zookeeper/LeaderElectorImp.java   | 77 +++++++++-----------
 .../zookeeper/LeaderListenerCallbackFactory.java   | 81 ++++++++++++++++++++++
 .../java/org/apache/storm/zookeeper/Zookeeper.java | 45 ++----------
 10 files changed, 162 insertions(+), 123 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 55e7058..3c256bb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
 
 public class StormServerHandler extends ChannelInboundHandlerAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
-    private static final Set<Class> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
+    private static final Set<Class<?>> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class<?>[]{ IOException.class }));
     private final IServer server;
     private final AtomicInteger failure_count;
 
diff --git a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
index 3fdb407..f0d877f 100644
--- a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
@@ -12,7 +12,6 @@
 
 package org.apache.storm.nimbus;
 
-import java.io.Closeable;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -21,7 +20,7 @@ import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 /**
  * The interface for leader election.
  */
-public interface ILeaderElector extends Closeable {
+public interface ILeaderElector extends AutoCloseable {
 
     /**
      * Method guaranteed to be called as part of initialization of leader elector instance.
@@ -37,11 +36,11 @@ public interface ILeaderElector extends Closeable {
     void addToLeaderLockQueue() throws Exception;
 
     /**
-     * Removes the caller from the leader lock queue. If the caller is leader
-     * also releases the lock. This method can be called multiple times so it needs
-     * to be idempotent.
+     * Removes the caller from leadership election, relinquishing leadership if acquired, then requeues for leadership after the specified
+     * delay.
+     * @param delayMs The delay to wait before re-entering the election
      */
-    void removeFromLeaderLockQueue() throws Exception;
+    void quitElectionFor(int delayMs) throws Exception;
 
     /**
      * Decide if the caller currently has the leader lock.
@@ -51,7 +50,7 @@ public interface ILeaderElector extends Closeable {
 
     /**
      * Get the current leader's address.
-     * @return the current leader's address , may return null if no one has the lock.
+     * @return the current leader's address, may return null if no one has the lock.
      */
     NimbusInfo getLeader();
     
@@ -71,9 +70,9 @@ public interface ILeaderElector extends Closeable {
     List<NimbusInfo> getAllNimbuses() throws Exception;
 
     /**
-     * Method called to allow for cleanup. once close this object can not be reused.
+     * Method called to allow for cleanup. Relinquishes leadership if owned by the caller.
      */
     @Override
-    void close();
+    void close() throws Exception;
 }
 
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 102720f..8d15fc2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -33,6 +33,7 @@ import java.io.ObjectOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.net.InetAddress;
@@ -110,7 +111,7 @@ import org.slf4j.LoggerFactory;
 public class Utils {
     public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
     public static final String DEFAULT_STREAM_ID = "default";
-    private static final Set<Class> defaultAllowedExceptions = new HashSet<>();
+    private static final Set<Class<?>> defaultAllowedExceptions = Collections.emptySet();
     private static final List<String> LOCALHOST_ADDRESSES = Lists.newArrayList("localhost", "127.0.0.1", "0:0:0:0:0:0:0:1");
     static SerializationDelegate serializationDelegate;
     private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
@@ -625,7 +626,7 @@ public class Utils {
         handleUncaughtException(t, defaultAllowedExceptions);
     }
 
-    public static void handleUncaughtException(Throwable t, Set<Class> allowedExceptions) {
+    public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
         if (t != null) {
             if (t instanceof OutOfMemoryError) {
                 try {
@@ -975,17 +976,19 @@ public class Utils {
         return m;
     }
 
-    public static void setupDefaultUncaughtExceptionHandler() {
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-            public void uncaughtException(Thread thread, Throwable thrown) {
-                try {
-                    handleUncaughtException(thrown);
-                } catch (Error err) {
-                    LOG.error("Received error in main thread.. terminating server...", err);
-                    Runtime.getRuntime().exit(-2);
-                }
+    public static UncaughtExceptionHandler createDefaultUncaughtExceptionHandler() {
+        return (thread, thrown) -> {
+            try {
+                handleUncaughtException(thrown);
+            } catch (Error err) {
+                LOG.error("Received error in thread {}.. terminating server...", thread.getName(), err);
+                Runtime.getRuntime().exit(-2);
             }
-        });
+        };
+    }
+    
+    public static void setupDefaultUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
     }
 
     public static Map<String, Object> findAndReadConfigFile(String name) {
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
index aa13c01..2599b4e 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
@@ -47,7 +47,7 @@ public class MockLeaderElector implements ILeaderElector {
     }
 
     @Override
-    public void removeFromLeaderLockQueue() throws Exception {
+    public void quitElectionFor(int delayMs) throws Exception {
         //NOOP
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 4458758..77d9d01 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -171,7 +171,7 @@ public class LocalFsBlobStore extends BlobStore {
                 sync.setZookeeperKeySet(zkKeys);
                 sync.setZkClient(zkClient);
                 sync.syncBlobs();
-            } //else not leader (NOOP)
+            } //else leader (NOOP)
         } //else local (NOOP)
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index c4edaa7..07bba66 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -455,7 +455,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
     private MetricStore metricsStore;
     private IAuthorizer authorizationHandler;
     //Cached CuratorFramework, mainly used for BlobStore.
-    private CuratorFramework zkClient;
+    private final CuratorFramework zkClient;
     //Cached topology -> executor ids, used for deciding timeout workers of heartbeatsCache.
     private AtomicReference<Map<String, Set<List<Integer>>>> idToExecutors;
     //May be null if worker tokens are not supported by the thrift transport.
@@ -4625,9 +4625,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
             if (actionNotifier != null) {
                 actionNotifier.cleanup();
             }
-            if (zkClient != null) {
-                zkClient.close();
-            }
+            zkClient.close();
             if (metricsStore != null) {
                 metricsStore.close();
             }
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index 2e1a6ca..bab42ce 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -23,6 +23,7 @@ import javax.security.auth.Subject;
 import com.codahale.metrics.Meter;
 import org.apache.commons.io.IOUtils;
 import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.blobstore.InputStreamWithMeta;
 import org.apache.storm.cluster.ClusterUtils;
@@ -39,6 +40,7 @@ import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
 import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.Utils;
 import org.apache.storm.zookeeper.ClientZookeeper;
 import org.slf4j.Logger;
@@ -60,31 +62,35 @@ public class LeaderListenerCallback {
     private final TopoCache tc;
     private final IStormClusterState clusterState;
     private final CuratorFramework zk;
-    private final LeaderLatch leaderLatch;
+    private final ILeaderElector leaderElector;
     private final Map conf;
     private final List<ACL> acls;
+    private final int requeueDelayMs;
 
     /**
      * Constructor for {@LeaderListenerCallback}.
      * @param conf config
      * @param zk zookeeper CuratorFramework client
-     * @param leaderLatch LeaderLatch
      * @param blobStore BlobStore
+     * @param leaderElector Leader elector
      * @param tc TopoCache
      * @param clusterState IStormClusterState
      * @param acls zookeeper acls
      */
-    public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
+    public LeaderListenerCallback(Map conf, CuratorFramework zk, BlobStore blobStore, ILeaderElector leaderElector,
                                   TopoCache tc, IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
         this.blobStore = blobStore;
         this.tc = tc;
         this.clusterState = clusterState;
         this.zk = zk;
-        this.leaderLatch = leaderLatch;
+        this.leaderElector = leaderElector;
         this.conf = conf;
         this.acls = acls;
         this.numGainedLeader = metricsRegistry.registerMeter("nimbus:num-gained-leadership");
         this.numLostLeader = metricsRegistry.registerMeter("nimbus:num-lost-leadership");
+        //Since we only give up leadership if we're waiting for blobs to sync,
+        //it makes sense to wait a full sync cycle before trying for leadership again.
+        this.requeueDelayMs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS))*1000;
     }
 
     /**
@@ -129,11 +135,11 @@ public class LeaderListenerCallback {
             } else {
                 LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
                          + "giving up leadership.");
-                closeLatch();
+                surrenderLeadership();
             }
         } else {
             LOG.info("code for all active topologies not available locally, giving up leadership.");
-            closeLatch();
+            surrenderLeadership();
         }
     }
 
@@ -218,11 +224,11 @@ public class LeaderListenerCallback {
         return activeTopologyDependencies;
     }
 
-    private void closeLatch() {
+    private void surrenderLeadership() {
         try {
-            leaderLatch.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+            leaderElector.quitElectionFor(requeueDelayMs);
+        } catch (Exception e) {
+            throw Utils.wrapInRuntime(e);
         }
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 6f53baa..7e887fb 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -18,54 +18,32 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.nimbus.TopoCache;
-import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.StormTimer;
 import org.apache.storm.nimbus.ILeaderElector;
-import org.apache.storm.nimbus.LeaderListenerCallback;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
 import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.Participant;
-import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class LeaderElectorImp implements ILeaderElector {
-    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
-    private final Map<String, Object> conf;
-    private final List<String> servers;
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
     private final CuratorFramework zk;
-    private final String leaderlockPath;
+    private final String leaderLockPath = "/leader-lock";
     private final String id;
     private final AtomicReference<LeaderLatch> leaderLatch;
-    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
-    private final BlobStore blobStore;
-    private final TopoCache tc;
-    private final IStormClusterState clusterState;
-    private final List<ACL> acls;
-    private final StormMetricsRegistry metricsRegistry;
+    private final LeaderListenerCallbackFactory leaderListenerCallbackFactory;
+    private final StormTimer timer;
 
-    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
-                            AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
-                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
-                            StormMetricsRegistry metricsRegistry) {
-        this.conf = conf;
-        this.servers = servers;
+    public LeaderElectorImp(CuratorFramework zk, String id, LeaderListenerCallbackFactory leaderListenerCallbackFactory) {
         this.zk = zk;
-        this.leaderlockPath = leaderlockPath;
         this.id = id;
-        this.leaderLatch = leaderLatch;
-        this.leaderLatchListener = leaderLatchListener;
-        this.blobStore = blobStore;
-        this.tc = tc;
-        this.clusterState = clusterState;
-        this.acls = acls;
-        this.metricsRegistry = metricsRegistry;
+        this.leaderLatch = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
+        this.leaderListenerCallbackFactory = leaderListenerCallbackFactory;
+        this.timer = new StormTimer("leader-elector-timer", Utils.createDefaultUncaughtExceptionHandler());
     }
 
     @Override
@@ -75,17 +53,17 @@ public class LeaderElectorImp implements ILeaderElector {
 
     @Override
     public void addToLeaderLockQueue() throws Exception {
-        // if this latch is already closed, we need to create new instance.
+        // if this latch is closed, we need to create new instance.
         if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
-            leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
-            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
-                metricsRegistry);
-            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
-            LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
+            LeaderLatch latch = new LeaderLatch(zk, leaderLockPath, id);
+            latch.addListener(leaderListenerCallbackFactory.create(this));
+            latch.start();
+            leaderLatch.set(latch);
+            LOG.info("LeaderLatch was in closed state. Reset the leaderLatch, and queued for leader lock.");
         }
-        // Only if the latch is not already started we invoke start
+        // If the latch is not started yet, start it
         if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
-            leaderLatch.get().addListener(leaderLatchListener.get());
+            leaderLatch.get().addListener(leaderListenerCallbackFactory.create(this));
             leaderLatch.get().start();
             LOG.info("Queued up for leader lock.");
         } else {
@@ -94,13 +72,23 @@ public class LeaderElectorImp implements ILeaderElector {
     }
 
     @Override
-    // Only started latches can be closed.
-    public void removeFromLeaderLockQueue() throws Exception {
+    public void quitElectionFor(int delayMs) throws Exception {
+        removeFromLeaderLockQueue();
+        timer.schedule(delayMs, () -> {
+            try {
+                addToLeaderLockQueue();
+            } catch (Exception e) {
+                throw Utils.wrapInRuntime(e);
+            }
+        }, false, 0); //Don't error if timer is shut down, happens when the elector is closed.
+    }
+    
+    private void removeFromLeaderLockQueue() throws Exception {
         if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
             leaderLatch.get().close();
             LOG.info("Removed from leader lock queue.");
         } else {
-            LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");
+            LOG.info("Leader latch is not started so no removeFromLeaderLockQueue needed.");
         }
     }
 
@@ -135,7 +123,8 @@ public class LeaderElectorImp implements ILeaderElector {
     }
 
     @Override
-    public void close() {
-        //Do nothing now.
+    public void close() throws Exception {
+        timer.close();
+        removeFromLeaderLockQueue();
     }
 }
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java
new file mode 100644
index 0000000..28d7882
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.LeaderListenerCallback;
+import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LeaderListenerCallbackFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallbackFactory.class);
+    
+    private final Map<String, Object> conf;
+    private final CuratorFramework zk;
+    private final BlobStore blobStore;
+    private final TopoCache tc;
+    private final IStormClusterState clusterState;
+    private final List<ACL> acls;
+    private final StormMetricsRegistry metricsRegistry;
+
+    public LeaderListenerCallbackFactory(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore, TopoCache tc,
+        IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
+        this.conf = conf;
+        this.zk = zk;
+        this.blobStore = blobStore;
+        this.tc = tc;
+        this.clusterState = clusterState;
+        this.acls = acls;
+        this.metricsRegistry = metricsRegistry;
+    }
+    
+    public LeaderLatchListener create(ILeaderElector elector) throws UnknownHostException {
+        final LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, blobStore, elector,
+            tc, clusterState, acls, metricsRegistry);
+        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        return new LeaderLatchListener() {
+
+            @Override
+            public void isLeader() {
+                callback.leaderCallBack();
+                LOG.info("{} gained leadership.", hostName);
+            }
+
+            @Override
+            public void notLeader() {
+                LOG.info("{} lost leadership.", hostName);
+                //Just to be sure
+                callback.notLeaderCallback();
+            }
+        };
+    }
+    
+}
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index 5468573..9171038 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -20,24 +20,17 @@ package org.apache.storm.zookeeper;
 
 import java.io.File;
 import java.net.BindException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
 import org.apache.storm.blobstore.BlobStore;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.daemon.nimbus.TopoCache;
 import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.nimbus.ILeaderElector;
-import org.apache.storm.nimbus.LeaderListenerCallback;
 import org.apache.storm.nimbus.NimbusInfo;
 import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
 import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.Participant;
 import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
 import org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory;
@@ -112,27 +105,6 @@ public class Zookeeper {
         return nimbusInfo;
     }
 
-    // Leader latch listener that will be invoked when we either gain or lose leadership
-    public static LeaderLatchListener leaderLatchListenerImpl(final LeaderListenerCallback callback)
-        throws UnknownHostException {
-        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
-        return new LeaderLatchListener() {
-
-            @Override
-            public void isLeader() {
-                callback.leaderCallBack();
-                LOG.info("{} gained leadership.", hostName);
-            }
-
-            @Override
-            public void notLeader() {
-                LOG.info("{} lost leadership.", hostName);
-                //Just to be sure
-                callback.notLeaderCallback();
-            }
-        };
-    }
-
     /**
      * Get master leader elector.
      *
@@ -143,28 +115,19 @@ public class Zookeeper {
      * @param clusterState {@link IStormClusterState}
      * @param acls         ACLs
      * @return Instance of {@link ILeaderElector}
-     *
-     * @throws UnknownHostException
      */
     public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore,
                                                  final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
-                                                 StormMetricsRegistry metricsRegistry) throws UnknownHostException {
+                                                 StormMetricsRegistry metricsRegistry) {
         return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls, metricsRegistry);
     }
 
     protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore,
                                                  final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
-                                                 StormMetricsRegistry metricsRegistry) throws
-        UnknownHostException {
-        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        String leaderLockPath = "/leader-lock";
+                                                 StormMetricsRegistry metricsRegistry) {
         String id = NimbusInfo.fromConf(conf).toHostPortString();
-        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
-        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
-            new AtomicReference<>(leaderLatchListenerImpl(
-                new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls, metricsRegistry)));
-        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
-                                    leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls, metricsRegistry);
+        return new LeaderElectorImp(zk, id,
+            new LeaderListenerCallbackFactory(conf, zk, blobStore, tc, clusterState, acls, metricsRegistry));
     }
 
 }