You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/04/20 20:52:01 UTC
nifi git commit: NIFI-5096: Periodically poll ZooKeeper to determine
the leader for each registered role in Leader Election. This avoids a
condition whereby a node may occasionally fail to receive notification that
it is no longer the elected leader. NIF
Repository: nifi
Updated Branches:
refs/heads/master 262bf011e -> 54eb6bc23
NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader.
NIFI-5096: More proactively setting leadership to false when ZooKeeper/Curator ConnectionState changes
This closes #2646
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/54eb6bc2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/54eb6bc2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/54eb6bc2
Branch: refs/heads/master
Commit: 54eb6bc23211ad2b499f42e14759f3646f806d2f
Parents: 262bf01
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 19 09:05:32 2018 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Apr 20 16:51:02 2018 -0400
----------------------------------------------------------------------
.../election/CuratorLeaderElectionManager.java | 67 ++++++++++++++++++--
1 file changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/54eb6bc2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 54ca257..d51c79f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.controller.leader.election;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -36,6 +34,9 @@ import org.apache.zookeeper.common.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
public class CuratorLeaderElectionManager implements LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
@@ -112,7 +113,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
if (!isStopped()) {
- final ElectionListener electionListener = new ElectionListener(roleName, listener);
+ final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
if (isParticipant) {
leaderSelector.autoRequeue();
@@ -358,12 +359,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final String roleName;
private final LeaderElectionStateChangeListener listener;
+ private final String participantId;
private volatile boolean leader;
- public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
+ public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
this.roleName = roleName;
this.listener = listener;
+ this.participantId = participantId;
}
public boolean isLeader() {
@@ -373,9 +376,37 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
logger.info("{} Connection State changed to {}", this, newState.name());
+
+ if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
+ if (leader == true) {
+ logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
+ }
+
+ leader = false;
+ }
+
super.stateChanged(client, newState);
}
+ /**
+ * Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
+ * its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
+ *
+ * @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
+ */
+ private boolean verifyLeader() {
+ final String leader = getLeader(roleName);
+ if (leader == null) {
+ logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
+ return false;
+ }
+
+ final boolean match = leader.equals(participantId);
+ logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
+ roleName, leader, participantId, match);
+ return match;
+ }
+
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
leader = true;
@@ -396,7 +427,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
// Curator API states that we lose the leadership election when we return from this method,
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
try {
- while (!isStopped()) {
+ int failureCount = 0;
+ int loopCount = 0;
+ while (!isStopped() && leader) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
@@ -404,6 +437,30 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
Thread.currentThread().interrupt();
return;
}
+
+ if (leader && ++loopCount % 50 == 0) {
+ // While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
+ // seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
+ // periodically to determine whether or not this node is still the elected leader.
+ try {
+ final boolean stillLeader = verifyLeader();
+ failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
+
+ if (!stillLeader) {
+ logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
+ break;
+ }
+ } catch (final Exception e) {
+ failureCount++;
+ if (failureCount > 1) {
+ logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ + "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
+ } else {
+ logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ + "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
+ }
+ }
+ }
}
} finally {
leader = false;