You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/09/30 20:47:38 UTC

[nifi] branch master updated: NIFI-6589: This closes #3670. Cache results from zookeeper when determining the leader NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no more than 5 seconds per review feedback

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fbd6200  NIFI-6589: This closes #3670. Cache results from zookeeper when determining the leader NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no more than 5 seconds per review feedback
fbd6200 is described below

commit fbd6200ab3e4410fd9cf05f31348ec56a89d5af7
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Aug 26 11:16:30 2019 -0400

    NIFI-6589: This closes #3670. Cache results from zookeeper when determining the leader
    NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no more than 5 seconds per review feedback
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../heartbeat/AbstractHeartbeatMonitor.java        |  6 +---
 .../coordination/node/NodeClusterCoordinator.java  |  3 +-
 .../heartbeat/TestAbstractHeartbeatMonitor.java    |  2 +-
 .../election/CuratorLeaderElectionManager.java     | 39 ++++++++++++++++++----
 .../nifi/integration/parameters/ParametersIT.java  |  2 ++
 .../web/StandardNiFiWebConfigurationContext.java   | 10 +++---
 6 files changed, 42 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 5fbe3f8..86b4cc1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -111,9 +111,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
         return clusterCoordinator;
     }
 
-    protected long getHeartbeatInterval(final TimeUnit timeUnit) {
-        return timeUnit.convert(heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
-    }
 
     /**
      * Fetches all of the latest heartbeats and updates the Cluster Coordinator
@@ -122,8 +119,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
      * Visible for testing.
      */
     protected synchronized void monitorHeartbeats() {
-        final NodeIdentifier activeCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
-        if (activeCoordinator != null && !activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+        if (!clusterCoordinator.isActiveClusterCoordinator()) {
             // Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
             // on a very large delay. So before we kick the node out of the cluster, we want to first check what the
             // ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 2ca8ef8..aec3a7a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -784,8 +784,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
 
     @Override
     public boolean isActiveClusterCoordinator() {
-        final NodeIdentifier self = getLocalNodeIdentifier();
-        return self != null && self.equals(getElectedActiveCoordinatorNode());
+        return leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.CLUSTER_COORDINATOR);
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 4aeff7b..2245c6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -349,7 +349,7 @@ public class TestAbstractHeartbeatMonitor {
 
         @Override
         public boolean isActiveClusterCoordinator() {
-            return false;
+            return true;
         }
 
         @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index d07c776..eac9447 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -446,6 +446,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         private final String participantId;
 
         private volatile boolean leader;
+        private long leaderUpdateTimestamp = 0L;
+        private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
 
         public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
             this.roleName = roleName;
@@ -453,12 +455,34 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
             this.participantId = participantId;
         }
 
-        public boolean isLeader() {
+        public synchronized boolean isLeader() {
+            if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) {
+                try {
+                    final long start = System.nanoTime();
+                    final boolean zkLeader = verifyLeader();
+                    final long nanos = System.nanoTime() - start;
+
+                    setLeader(zkLeader);
+                    logger.debug("Took {} nanoseconds to reach out to ZooKeeper in order to check whether or not this node is currently the leader for Role '{}'. ZooKeeper reported {}",
+                        nanos, roleName, zkLeader);
+                } catch (final Exception e) {
+                    logger.warn("Attempted to reach out to ZooKeeper to determine whether or not this node is the elected leader for Role '{}' but failed to communicate with ZooKeeper. " +
+                        "Assuming that this node is not the leader.", roleName, e);
+
+                    return false;
+                }
+            }
+
             return leader;
         }
 
+        private synchronized void setLeader(final boolean leader) {
+            this.leader = leader;
+            this.leaderUpdateTimestamp = System.currentTimeMillis();
+        }
+
         @Override
-        public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+        public synchronized void stateChanged(final CuratorFramework client, final ConnectionState newState) {
             logger.info("{} Connection State changed to {}", this, newState.name());
 
             if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
@@ -466,7 +490,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
                     logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
                 }
 
-                leader = false;
+                setLeader(false);
             }
 
             super.stateChanged(client, newState);
@@ -482,18 +506,19 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
             final String leader = getLeader(roleName);
             if (leader == null) {
                 logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
-                return false;
+                setLeader(false);
             }
 
             final boolean match = leader.equals(participantId);
             logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
                 roleName, leader, participantId, match);
+            setLeader(match);
             return match;
         }
 
         @Override
         public void takeLeadership(final CuratorFramework client) throws Exception {
-            leader = true;
+            setLeader(true);
             logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
 
             if (listener != null) {
@@ -502,7 +527,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
                 } catch (final Exception e) {
                     logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
                     logger.error("", e);
-                    leader = false;
+                    setLeader(false);
                     Thread.sleep(1000L);
                     return;
                 }
@@ -547,7 +572,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
                     }
                 }
             } finally {
-                leader = false;
+                setLeader(false);
                 logger.info("{} This node is no longer leader for role '{}'", this, roleName);
 
                 if (listener != null) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
index 4a394ae..33d7639 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
@@ -366,4 +366,6 @@ public class ParametersIT extends FrameworkIntegrationTest {
         assertEquals(allParamNames, referencedParameters);
 
     }
+
+
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 5f45085..e24c25c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -279,16 +279,16 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
     }
 
     private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
-        final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
-        if (coordinatorNode == null) {
-            throw new NoClusterCoordinatorException();
-        }
-
         // Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
         // to the cluster nodes themselves.
         if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
             return requestReplicator.replicate(method, uri, entity, headers).awaitMergedResponse();
         } else {
+            final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+            if (coordinatorNode == null) {
+                throw new NoClusterCoordinatorException();
+            }
+
             return requestReplicator.forwardToCoordinator(coordinatorNode, method, uri, entity, headers).awaitMergedResponse();
         }
     }