You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/09/30 20:47:38 UTC
[nifi] branch master updated: NIFI-6589: This closes #3670. Cache
results from zookeeper when determining the leader NIFI-6589: Updated
CuratorLeaderElectionManager to cache results for no more than 5 seconds
per review feedback
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new fbd6200 NIFI-6589: This closes #3670. Cache results from zookeeper when determining the leader NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no more than 5 seconds per review feedback
fbd6200 is described below
commit fbd6200ab3e4410fd9cf05f31348ec56a89d5af7
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Aug 26 11:16:30 2019 -0400
NIFI-6589: This closes #3670. Cache results from zookeeper when determining the leader
NIFI-6589: Updated CuratorLeaderElectionManager to cache results for no more than 5 seconds per review feedback
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../heartbeat/AbstractHeartbeatMonitor.java | 6 +---
.../coordination/node/NodeClusterCoordinator.java | 3 +-
.../heartbeat/TestAbstractHeartbeatMonitor.java | 2 +-
.../election/CuratorLeaderElectionManager.java | 39 ++++++++++++++++++----
.../nifi/integration/parameters/ParametersIT.java | 2 ++
.../web/StandardNiFiWebConfigurationContext.java | 10 +++---
6 files changed, 42 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 5fbe3f8..86b4cc1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -111,9 +111,6 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
return clusterCoordinator;
}
- protected long getHeartbeatInterval(final TimeUnit timeUnit) {
- return timeUnit.convert(heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
- }
/**
* Fetches all of the latest heartbeats and updates the Cluster Coordinator
@@ -122,8 +119,7 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
* Visible for testing.
*/
protected synchronized void monitorHeartbeats() {
- final NodeIdentifier activeCoordinator = clusterCoordinator.getElectedActiveCoordinatorNode();
- if (activeCoordinator != null && !activeCoordinator.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+ if (!clusterCoordinator.isActiveClusterCoordinator()) {
// Occasionally Curator appears to not notify us that we have lost the elected leader role, or does so
// on a very large delay. So before we kick the node out of the cluster, we want to first check what the
// ZNode in ZooKeeper says, and ensure that this is the node that is being advertised as the appropriate
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 2ca8ef8..aec3a7a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -784,8 +784,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
@Override
public boolean isActiveClusterCoordinator() {
- final NodeIdentifier self = getLocalNodeIdentifier();
- return self != null && self.equals(getElectedActiveCoordinatorNode());
+ return leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.CLUSTER_COORDINATOR);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 4aeff7b..2245c6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -349,7 +349,7 @@ public class TestAbstractHeartbeatMonitor {
@Override
public boolean isActiveClusterCoordinator() {
- return false;
+ return true;
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index d07c776..eac9447 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -446,6 +446,8 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
private final String participantId;
private volatile boolean leader;
+ private long leaderUpdateTimestamp = 0L;
+ private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
this.roleName = roleName;
@@ -453,12 +455,34 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
this.participantId = participantId;
}
- public boolean isLeader() {
+ public synchronized boolean isLeader() {
+ if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) {
+ try {
+ final long start = System.nanoTime();
+ final boolean zkLeader = verifyLeader();
+ final long nanos = System.nanoTime() - start;
+
+ setLeader(zkLeader);
+ logger.debug("Took {} nanoseconds to reach out to ZooKeeper in order to check whether or not this node is currently the leader for Role '{}'. ZooKeeper reported {}",
+ nanos, roleName, zkLeader);
+ } catch (final Exception e) {
+ logger.warn("Attempted to reach out to ZooKeeper to determine whether or not this node is the elected leader for Role '{}' but failed to communicate with ZooKeeper. " +
+ "Assuming that this node is not the leader.", roleName, e);
+
+ return false;
+ }
+ }
+
return leader;
}
+ private synchronized void setLeader(final boolean leader) {
+ this.leader = leader;
+ this.leaderUpdateTimestamp = System.currentTimeMillis();
+ }
+
@Override
- public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+ public synchronized void stateChanged(final CuratorFramework client, final ConnectionState newState) {
logger.info("{} Connection State changed to {}", this, newState.name());
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
@@ -466,7 +490,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
}
- leader = false;
+ setLeader(false);
}
super.stateChanged(client, newState);
@@ -482,18 +506,19 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
final String leader = getLeader(roleName);
if (leader == null) {
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
- return false;
+ setLeader(false);
}
final boolean match = leader.equals(participantId);
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
roleName, leader, participantId, match);
+ setLeader(match);
return match;
}
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
- leader = true;
+ setLeader(true);
logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
if (listener != null) {
@@ -502,7 +527,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
} catch (final Exception e) {
logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
logger.error("", e);
- leader = false;
+ setLeader(false);
Thread.sleep(1000L);
return;
}
@@ -547,7 +572,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
}
}
} finally {
- leader = false;
+ setLeader(false);
logger.info("{} This node is no longer leader for role '{}'", this, roleName);
if (listener != null) {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
index 4a394ae..33d7639 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/parameters/ParametersIT.java
@@ -366,4 +366,6 @@ public class ParametersIT extends FrameworkIntegrationTest {
assertEquals(allParamNames, referencedParameters);
}
+
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
index 5f45085..e24c25c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiWebConfigurationContext.java
@@ -279,16 +279,16 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
}
private NodeResponse replicate(final String method, final URI uri, final Object entity, final Map<String, String> headers) throws InterruptedException {
- final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
- if (coordinatorNode == null) {
- throw new NoClusterCoordinatorException();
- }
-
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly
// to the cluster nodes themselves.
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
return requestReplicator.replicate(method, uri, entity, headers).awaitMergedResponse();
} else {
+ final NodeIdentifier coordinatorNode = clusterCoordinator.getElectedActiveCoordinatorNode();
+ if (coordinatorNode == null) {
+ throw new NoClusterCoordinatorException();
+ }
+
return requestReplicator.forwardToCoordinator(coordinatorNode, method, uri, entity, headers).awaitMergedResponse();
}
}