You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2018/04/20 20:52:01 UTC

nifi git commit: NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader. NIF

Repository: nifi
Updated Branches:
  refs/heads/master 262bf011e -> 54eb6bc23


NIFI-5096: Periodically poll ZooKeeper to determine the leader for each registered role in Leader Election. This avoids a condition whereby a node may occasionally fail to receive notification that it is no longer the elected leader.
NIFI-5096: More proactively setting leadership to false when ZooKeeper/Curator ConnectionState changes

This closes #2646


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/54eb6bc2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/54eb6bc2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/54eb6bc2

Branch: refs/heads/master
Commit: 54eb6bc23211ad2b499f42e14759f3646f806d2f
Parents: 262bf01
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Apr 19 09:05:32 2018 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Fri Apr 20 16:51:02 2018 -0400

----------------------------------------------------------------------
 .../election/CuratorLeaderElectionManager.java  | 67 ++++++++++++++++++--
 1 file changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/54eb6bc2/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 54ca257..d51c79f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.controller.leader.election;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -36,6 +34,9 @@ import org.apache.zookeeper.common.PathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class CuratorLeaderElectionManager implements LeaderElectionManager {
 
     private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
@@ -112,7 +113,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
 
         if (!isStopped()) {
-            final ElectionListener electionListener = new ElectionListener(roleName, listener);
+            final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
             final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
             if (isParticipant) {
                 leaderSelector.autoRequeue();
@@ -358,12 +359,14 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
 
         private final String roleName;
         private final LeaderElectionStateChangeListener listener;
+        private final String participantId;
 
         private volatile boolean leader;
 
-        public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
+        public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
             this.roleName = roleName;
             this.listener = listener;
+            this.participantId = participantId;
         }
 
         public boolean isLeader() {
@@ -373,9 +376,37 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         @Override
         public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
             logger.info("{} Connection State changed to {}", this, newState.name());
+
+            if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
+                if (leader == true) {
+                    logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
+                }
+
+                leader = false;
+            }
+
             super.stateChanged(client, newState);
         }
 
+        /**
+         * Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
+         * its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
+         *
+         * @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
+         */
+        private boolean verifyLeader() {
+            final String leader = getLeader(roleName);
+            if (leader == null) {
+                logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
+                return false;
+            }
+
+            final boolean match = leader.equals(participantId);
+            logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
+                roleName, leader, participantId, match);
+            return match;
+        }
+
         @Override
         public void takeLeadership(final CuratorFramework client) throws Exception {
             leader = true;
@@ -396,7 +427,9 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
             // Curator API states that we lose the leadership election when we return from this method,
             // so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
             try {
-                while (!isStopped()) {
+                int failureCount = 0;
+                int loopCount = 0;
+                while (!isStopped() && leader) {
                     try {
                         Thread.sleep(100L);
                     } catch (final InterruptedException ie) {
@@ -404,6 +437,30 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
                         Thread.currentThread().interrupt();
                         return;
                     }
+
+                    if (leader && ++loopCount % 50 == 0) {
+                        // While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
+                        // seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
+                        // periodically to determine whether or not this node is still the elected leader.
+                        try {
+                            final boolean stillLeader = verifyLeader();
+                            failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
+
+                            if (!stillLeader) {
+                                logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
+                                break;
+                            }
+                        } catch (final Exception e) {
+                            failureCount++;
+                            if (failureCount > 1) {
+                                logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+                                    + "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
+                            } else {
+                                logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+                                    + "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
+                            }
+                        }
+                    }
                 }
             } finally {
                 leader = false;