You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2019/03/29 03:42:07 UTC
[storm] branch master updated: STORM-3354: Fix Nimbus not
reentering leadership election in some cases,
and quit leadership election cleanly when Nimbus shuts down
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 27921e2 STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down
new 7e98f29 Merge branch 'STORM-3354' of https://github.com/srdo/storm into STORM-3354-merge
27921e2 is described below
commit 27921e208a1373f4f3d8b1639693510e23cb593b
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Thu Mar 7 12:36:03 2019 +0100
STORM-3354: Fix Nimbus not reentering leadership election in some cases, and quit leadership election cleanly when Nimbus shuts down
---
.../storm/messaging/netty/StormServerHandler.java | 2 +-
.../org/apache/storm/nimbus/ILeaderElector.java | 17 +++--
.../src/jvm/org/apache/storm/utils/Utils.java | 27 ++++----
.../apache/storm/testing/MockLeaderElector.java | 2 +-
.../apache/storm/blobstore/LocalFsBlobStore.java | 2 +-
.../org/apache/storm/daemon/nimbus/Nimbus.java | 6 +-
.../storm/nimbus/LeaderListenerCallback.java | 26 ++++---
.../apache/storm/zookeeper/LeaderElectorImp.java | 77 +++++++++-----------
.../zookeeper/LeaderListenerCallbackFactory.java | 81 ++++++++++++++++++++++
.../java/org/apache/storm/zookeeper/Zookeeper.java | 45 ++----------
10 files changed, 162 insertions(+), 123 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 55e7058..3c256bb 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
public class StormServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
- private static final Set<Class> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
+ private static final Set<Class<?>> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class<?>[]{ IOException.class }));
private final IServer server;
private final AtomicInteger failure_count;
diff --git a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
index 3fdb407..f0d877f 100644
--- a/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
+++ b/storm-client/src/jvm/org/apache/storm/nimbus/ILeaderElector.java
@@ -12,7 +12,6 @@
package org.apache.storm.nimbus;
-import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -21,7 +20,7 @@ import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
/**
* The interface for leader election.
*/
-public interface ILeaderElector extends Closeable {
+public interface ILeaderElector extends AutoCloseable {
/**
* Method guaranteed to be called as part of initialization of leader elector instance.
@@ -37,11 +36,11 @@ public interface ILeaderElector extends Closeable {
void addToLeaderLockQueue() throws Exception;
/**
- * Removes the caller from the leader lock queue. If the caller is leader
- * also releases the lock. This method can be called multiple times so it needs
- * to be idempotent.
+ * Removes the caller from leadership election, relinquishing leadership if acquired, then requeues for leadership after the specified
+ * delay.
+ * @param delayMs The delay to wait before re-entering the election
*/
- void removeFromLeaderLockQueue() throws Exception;
+ void quitElectionFor(int delayMs) throws Exception;
/**
* Decide if the caller currently has the leader lock.
@@ -51,7 +50,7 @@ public interface ILeaderElector extends Closeable {
/**
* Get the current leader's address.
- * @return the current leader's address , may return null if no one has the lock.
+ * @return the current leader's address, may return null if no one has the lock.
*/
NimbusInfo getLeader();
@@ -71,9 +70,9 @@ public interface ILeaderElector extends Closeable {
List<NimbusInfo> getAllNimbuses() throws Exception;
/**
- * Method called to allow for cleanup. once close this object can not be reused.
+ * Method called to allow for cleanup. Relinquishes leadership if owned by the caller.
*/
@Override
- void close();
+ void close() throws Exception;
}
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 102720f..8d15fc2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -33,6 +33,7 @@ import java.io.ObjectOutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.net.InetAddress;
@@ -110,7 +111,7 @@ import org.slf4j.LoggerFactory;
public class Utils {
public static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final String DEFAULT_STREAM_ID = "default";
- private static final Set<Class> defaultAllowedExceptions = new HashSet<>();
+ private static final Set<Class<?>> defaultAllowedExceptions = Collections.emptySet();
private static final List<String> LOCALHOST_ADDRESSES = Lists.newArrayList("localhost", "127.0.0.1", "0:0:0:0:0:0:0:1");
static SerializationDelegate serializationDelegate;
private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
@@ -625,7 +626,7 @@ public class Utils {
handleUncaughtException(t, defaultAllowedExceptions);
}
- public static void handleUncaughtException(Throwable t, Set<Class> allowedExceptions) {
+ public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
if (t != null) {
if (t instanceof OutOfMemoryError) {
try {
@@ -975,17 +976,19 @@ public class Utils {
return m;
}
- public static void setupDefaultUncaughtExceptionHandler() {
- Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- public void uncaughtException(Thread thread, Throwable thrown) {
- try {
- handleUncaughtException(thrown);
- } catch (Error err) {
- LOG.error("Received error in main thread.. terminating server...", err);
- Runtime.getRuntime().exit(-2);
- }
+ public static UncaughtExceptionHandler createDefaultUncaughtExceptionHandler() {
+ return (thread, thrown) -> {
+ try {
+ handleUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in thread {}.. terminating server...", thread.getName(), err);
+ Runtime.getRuntime().exit(-2);
}
- });
+ };
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
}
public static Map<String, Object> findAndReadConfigFile(String name) {
diff --git a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
index aa13c01..2599b4e 100644
--- a/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
+++ b/storm-core/src/jvm/org/apache/storm/testing/MockLeaderElector.java
@@ -47,7 +47,7 @@ public class MockLeaderElector implements ILeaderElector {
}
@Override
- public void removeFromLeaderLockQueue() throws Exception {
+ public void quitElectionFor(int delayMs) throws Exception {
//NOOP
}
diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
index 4458758..77d9d01 100644
--- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
+++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java
@@ -171,7 +171,7 @@ public class LocalFsBlobStore extends BlobStore {
sync.setZookeeperKeySet(zkKeys);
sync.setZkClient(zkClient);
sync.syncBlobs();
- } //else not leader (NOOP)
+ } //else leader (NOOP)
} //else local (NOOP)
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index c4edaa7..07bba66 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -455,7 +455,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
private MetricStore metricsStore;
private IAuthorizer authorizationHandler;
//Cached CuratorFramework, mainly used for BlobStore.
- private CuratorFramework zkClient;
+ private final CuratorFramework zkClient;
//Cached topology -> executor ids, used for deciding timeout workers of heartbeatsCache.
private AtomicReference<Map<String, Set<List<Integer>>>> idToExecutors;
//May be null if worker tokens are not supported by the thrift transport.
@@ -4625,9 +4625,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon {
if (actionNotifier != null) {
actionNotifier.cleanup();
}
- if (zkClient != null) {
- zkClient.close();
- }
+ zkClient.close();
if (metricsStore != null) {
metricsStore.close();
}
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
index 2e1a6ca..bab42ce 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java
@@ -23,6 +23,7 @@ import javax.security.auth.Subject;
import com.codahale.metrics.Meter;
import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.cluster.ClusterUtils;
@@ -39,6 +40,7 @@ import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.slf4j.Logger;
@@ -60,31 +62,35 @@ public class LeaderListenerCallback {
private final TopoCache tc;
private final IStormClusterState clusterState;
private final CuratorFramework zk;
- private final LeaderLatch leaderLatch;
+ private final ILeaderElector leaderElector;
private final Map conf;
private final List<ACL> acls;
+ private final int requeueDelayMs;
/**
* Constructor for {@LeaderListenerCallback}.
* @param conf config
* @param zk zookeeper CuratorFramework client
- * @param leaderLatch LeaderLatch
* @param blobStore BlobStore
+ * @param leaderElector Leader elector
* @param tc TopoCache
* @param clusterState IStormClusterState
* @param acls zookeeper acls
*/
- public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderLatch, BlobStore blobStore,
+ public LeaderListenerCallback(Map conf, CuratorFramework zk, BlobStore blobStore, ILeaderElector leaderElector,
TopoCache tc, IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
this.blobStore = blobStore;
this.tc = tc;
this.clusterState = clusterState;
this.zk = zk;
- this.leaderLatch = leaderLatch;
+ this.leaderElector = leaderElector;
this.conf = conf;
this.acls = acls;
this.numGainedLeader = metricsRegistry.registerMeter("nimbus:num-gained-leadership");
this.numLostLeader = metricsRegistry.registerMeter("nimbus:num-lost-leadership");
+ //Since we only give up leadership if we're waiting for blobs to sync,
+ //it makes sense to wait a full sync cycle before trying for leadership again.
+ this.requeueDelayMs = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_CODE_SYNC_FREQ_SECS))*1000;
}
/**
@@ -129,11 +135,11 @@ public class LeaderListenerCallback {
} else {
LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, "
+ "giving up leadership.");
- closeLatch();
+ surrenderLeadership();
}
} else {
LOG.info("code for all active topologies not available locally, giving up leadership.");
- closeLatch();
+ surrenderLeadership();
}
}
@@ -218,11 +224,11 @@ public class LeaderListenerCallback {
return activeTopologyDependencies;
}
- private void closeLatch() {
+ private void surrenderLeadership() {
try {
- leaderLatch.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ leaderElector.quitElectionFor(requeueDelayMs);
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
index 6f53baa..7e887fb 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderElectorImp.java
@@ -18,54 +18,32 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.storm.blobstore.BlobStore;
-import org.apache.storm.cluster.IStormClusterState;
-import org.apache.storm.daemon.nimbus.TopoCache;
-import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.StormTimer;
import org.apache.storm.nimbus.ILeaderElector;
-import org.apache.storm.nimbus.LeaderListenerCallback;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.Participant;
-import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LeaderElectorImp implements ILeaderElector {
- private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
- private final Map<String, Object> conf;
- private final List<String> servers;
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
private final CuratorFramework zk;
- private final String leaderlockPath;
+ private final String leaderLockPath = "/leader-lock";
private final String id;
private final AtomicReference<LeaderLatch> leaderLatch;
- private final AtomicReference<LeaderLatchListener> leaderLatchListener;
- private final BlobStore blobStore;
- private final TopoCache tc;
- private final IStormClusterState clusterState;
- private final List<ACL> acls;
- private final StormMetricsRegistry metricsRegistry;
+ private final LeaderListenerCallbackFactory leaderListenerCallbackFactory;
+ private final StormTimer timer;
- public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
- AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
- BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
- StormMetricsRegistry metricsRegistry) {
- this.conf = conf;
- this.servers = servers;
+ public LeaderElectorImp(CuratorFramework zk, String id, LeaderListenerCallbackFactory leaderListenerCallbackFactory) {
this.zk = zk;
- this.leaderlockPath = leaderlockPath;
this.id = id;
- this.leaderLatch = leaderLatch;
- this.leaderLatchListener = leaderLatchListener;
- this.blobStore = blobStore;
- this.tc = tc;
- this.clusterState = clusterState;
- this.acls = acls;
- this.metricsRegistry = metricsRegistry;
+ this.leaderLatch = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
+ this.leaderListenerCallbackFactory = leaderListenerCallbackFactory;
+ this.timer = new StormTimer("leader-elector-timer", Utils.createDefaultUncaughtExceptionHandler());
}
@Override
@@ -75,17 +53,17 @@ public class LeaderElectorImp implements ILeaderElector {
@Override
public void addToLeaderLockQueue() throws Exception {
- // if this latch is already closed, we need to create new instance.
+ // if this latch is closed, we need to create new instance.
if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
- leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
- LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
- metricsRegistry);
- leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
- LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
+ LeaderLatch latch = new LeaderLatch(zk, leaderLockPath, id);
+ latch.addListener(leaderListenerCallbackFactory.create(this));
+ latch.start();
+ leaderLatch.set(latch);
+ LOG.info("LeaderLatch was in closed state. Reset the leaderLatch, and queued for leader lock.");
}
- // Only if the latch is not already started we invoke start
+ // If the latch is not started yet, start it
if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
- leaderLatch.get().addListener(leaderLatchListener.get());
+ leaderLatch.get().addListener(leaderListenerCallbackFactory.create(this));
leaderLatch.get().start();
LOG.info("Queued up for leader lock.");
} else {
@@ -94,13 +72,23 @@ public class LeaderElectorImp implements ILeaderElector {
}
@Override
- // Only started latches can be closed.
- public void removeFromLeaderLockQueue() throws Exception {
+ public void quitElectionFor(int delayMs) throws Exception {
+ removeFromLeaderLockQueue();
+ timer.schedule(delayMs, () -> {
+ try {
+ addToLeaderLockQueue();
+ } catch (Exception e) {
+ throw Utils.wrapInRuntime(e);
+ }
+ }, false, 0); //Don't error if timer is shut down, happens when the elector is closed.
+ }
+
+ private void removeFromLeaderLockQueue() throws Exception {
if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
leaderLatch.get().close();
LOG.info("Removed from leader lock queue.");
} else {
- LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");
+ LOG.info("Leader latch is not started so no removeFromLeaderLockQueue needed.");
}
}
@@ -135,7 +123,8 @@ public class LeaderElectorImp implements ILeaderElector {
}
@Override
- public void close() {
- //Do nothing now.
+ public void close() throws Exception {
+ timer.close();
+ removeFromLeaderLockQueue();
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java
new file mode 100644
index 0000000..28d7882
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/LeaderListenerCallbackFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.zookeeper;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.nimbus.TopoCache;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.LeaderListenerCallback;
+import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
+import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LeaderListenerCallbackFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallbackFactory.class);
+
+ private final Map<String, Object> conf;
+ private final CuratorFramework zk;
+ private final BlobStore blobStore;
+ private final TopoCache tc;
+ private final IStormClusterState clusterState;
+ private final List<ACL> acls;
+ private final StormMetricsRegistry metricsRegistry;
+
+ public LeaderListenerCallbackFactory(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore, TopoCache tc,
+ IStormClusterState clusterState, List<ACL> acls, StormMetricsRegistry metricsRegistry) {
+ this.conf = conf;
+ this.zk = zk;
+ this.blobStore = blobStore;
+ this.tc = tc;
+ this.clusterState = clusterState;
+ this.acls = acls;
+ this.metricsRegistry = metricsRegistry;
+ }
+
+ public LeaderLatchListener create(ILeaderElector elector) throws UnknownHostException {
+ final LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, blobStore, elector,
+ tc, clusterState, acls, metricsRegistry);
+ final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ return new LeaderLatchListener() {
+
+ @Override
+ public void isLeader() {
+ callback.leaderCallBack();
+ LOG.info("{} gained leadership.", hostName);
+ }
+
+ @Override
+ public void notLeader() {
+ LOG.info("{} lost leadership.", hostName);
+ //Just to be sure
+ callback.notLeaderCallback();
+ }
+ };
+ }
+
+}
diff --git a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
index 5468573..9171038 100644
--- a/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-server/src/main/java/org/apache/storm/zookeeper/Zookeeper.java
@@ -20,24 +20,17 @@ package org.apache.storm.zookeeper;
import java.io.File;
import java.net.BindException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
-import org.apache.storm.Config;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.nimbus.TopoCache;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.nimbus.ILeaderElector;
-import org.apache.storm.nimbus.LeaderListenerCallback;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.storm.shade.org.apache.curator.framework.recipes.leader.Participant;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory;
@@ -112,27 +105,6 @@ public class Zookeeper {
return nimbusInfo;
}
- // Leader latch listener that will be invoked when we either gain or lose leadership
- public static LeaderLatchListener leaderLatchListenerImpl(final LeaderListenerCallback callback)
- throws UnknownHostException {
- final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
- return new LeaderLatchListener() {
-
- @Override
- public void isLeader() {
- callback.leaderCallBack();
- LOG.info("{} gained leadership.", hostName);
- }
-
- @Override
- public void notLeader() {
- LOG.info("{} lost leadership.", hostName);
- //Just to be sure
- callback.notLeaderCallback();
- }
- };
- }
-
/**
* Get master leader elector.
*
@@ -143,28 +115,19 @@ public class Zookeeper {
* @param clusterState {@link IStormClusterState}
* @param acls ACLs
* @return Instance of {@link ILeaderElector}
- *
- * @throws UnknownHostException
*/
public static ILeaderElector zkLeaderElector(Map<String, Object> conf, CuratorFramework zkClient, BlobStore blobStore,
final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
- StormMetricsRegistry metricsRegistry) throws UnknownHostException {
+ StormMetricsRegistry metricsRegistry) {
return _instance.zkLeaderElectorImpl(conf, zkClient, blobStore, tc, clusterState, acls, metricsRegistry);
}
protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, CuratorFramework zk, BlobStore blobStore,
final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
- StormMetricsRegistry metricsRegistry) throws
- UnknownHostException {
- List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
- String leaderLockPath = "/leader-lock";
+ StormMetricsRegistry metricsRegistry) {
String id = NimbusInfo.fromConf(conf).toHostPortString();
- AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
- AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
- new AtomicReference<>(leaderLatchListenerImpl(
- new LeaderListenerCallback(conf, zk, leaderLatchAtomicReference.get(), blobStore, tc, clusterState, acls, metricsRegistry)));
- return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
- leaderLatchListenerAtomicReference, blobStore, tc, clusterState, acls, metricsRegistry);
+ return new LeaderElectorImp(zk, id,
+ new LeaderListenerCallbackFactory(conf, zk, blobStore, tc, clusterState, acls, metricsRegistry));
}
}