You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:52 UTC
[16/18] nifi git commit: NIFI-483: Use ZooKeeper's Leader Election to
determine Primary Node. This closes #301
NIFI-483: Use ZooKeeper's Leader Election to determine Primary Node. This closes #301
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1ac05266
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1ac05266
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1ac05266
Branch: refs/heads/master
Commit: 1ac05266a5cbab0c05551fc40201376cca13b540
Parents: 0d3bd2c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Mar 23 13:16:13 2016 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Mon Apr 4 11:47:08 2016 -0400
----------------------------------------------------------------------
nifi-assembly/pom.xml | 6 +
.../org/apache/nifi/util/NiFiProperties.java | 9 +
.../protocol/ClusterManagerProtocolSender.java | 9 -
.../impl/ClusterManagerProtocolSenderImpl.java | 27 --
.../ClusterManagerProtocolSenderListener.java | 7 -
.../protocol/jaxb/message/ObjectFactory.java | 5 -
.../message/PrimaryRoleAssignmentMessage.java | 55 ----
.../protocol/message/ProtocolMessage.java | 1 -
.../nifi/cluster/manager/ClusterManager.java | 18 --
.../cluster/manager/impl/WebClusterManager.java | 199 +------------
.../nifi-framework/nifi-framework-core/pom.xml | 9 +
.../apache/nifi/controller/FlowController.java | 103 ++++---
.../nifi/controller/StandardFlowService.java | 40 ---
.../election/CuratorLeaderElectionManager.java | 285 +++++++++++++++++++
.../leader/election/LeaderElectionManager.java | 71 +++++
.../LeaderElectionStateChangeListener.java | 35 +++
.../src/main/resources/conf/nifi.properties | 6 +
.../nifi/web/StandardNiFiServiceFacade.java | 6 -
pom.xml | 13 +
19 files changed, 495 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 0422b8e..09a8d50 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -456,6 +456,12 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.manager.protocol.threads>10</nifi.cluster.manager.protocol.threads>
<nifi.cluster.manager.safemode.duration>0 sec</nifi.cluster.manager.safemode.duration>
+ <!-- nifi.properties: zookeeper properties -->
+ <nifi.zookeeper.connect.string></nifi.zookeeper.connect.string>
+ <nifi.zookeeper.connect.timeout>3 secs</nifi.zookeeper.connect.timeout>
+ <nifi.zookeeper.session.timeout>3 secs</nifi.zookeeper.session.timeout>
+ <nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
+
<!-- nifi.properties: kerberos properties -->
<nifi.kerberos.krb5.file> </nifi.kerberos.krb5.file>
<nifi.kerberos.service.principal />
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index 8c98c0b..517b19a 100644
--- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -174,6 +174,12 @@ public class NiFiProperties extends Properties {
public static final String CLUSTER_NODE_UNICAST_MANAGER_ADDRESS = "nifi.cluster.node.unicast.manager.address";
public static final String CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT = "nifi.cluster.node.unicast.manager.protocol.port";
+ // zookeeper properties
+ public static final String ZOOKEEPER_CONNECT_STRING = "nifi.zookeeper.connect.string";
+ public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
+ public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
+ public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
+
// cluster manager properties
public static final String CLUSTER_IS_MANAGER = "nifi.cluster.is.manager";
public static final String CLUSTER_MANAGER_ADDRESS = "nifi.cluster.manager.address";
@@ -226,6 +232,9 @@ public class NiFiProperties extends Properties {
public static final String DEFAULT_PERSISTENT_STATE_DIRECTORY = "./conf/state";
public static final String DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY = "5 mins";
public static final String DEFAULT_BORED_YIELD_DURATION = "10 millis";
+ public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
+ public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
+ public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
// cluster common defaults
public static final String DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL = "5 sec";
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
index 10653ff..bdefbbf 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
@@ -19,7 +19,6 @@ package org.apache.nifi.cluster.protocol;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
@@ -57,14 +56,6 @@ public interface ClusterManagerProtocolSender {
void disconnect(DisconnectMessage msg) throws ProtocolException;
/**
- * Sends an "assign primary role" message to a node.
- *
- * @param msg a message
- * @throws ProtocolException if communication failed
- */
- void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException;
-
- /**
* Sets the {@link BulletinRepository} that can be used to report bulletins
*
* @param bulletinRepository repo
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
index 636a6d3..fb9292e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
@@ -31,7 +31,6 @@ import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -56,7 +55,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
private final ProtocolContext<ProtocolMessage> protocolContext;
private final SocketConfiguration socketConfiguration;
private int handshakeTimeoutSeconds;
- private volatile BulletinRepository bulletinRepository;
public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
if (socketConfiguration == null) {
@@ -71,7 +69,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
@Override
public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
}
/**
@@ -183,30 +180,6 @@ public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolS
}
}
- /**
- * Assigns the primary role to a node.
- *
- * @param msg a message
- *
- * @throws ProtocolException if the message failed to be sent
- */
- @Override
- public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
- Socket socket = null;
- try {
- socket = createSocket(msg.getNodeId(), true);
-
- try {
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch (final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
// update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
index 8eb83a4..54d33a8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
@@ -26,7 +26,6 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.reporting.BulletinRepository;
@@ -108,10 +107,4 @@ public class ClusterManagerProtocolSenderListener implements ClusterManagerProto
public void disconnect(DisconnectMessage msg) throws ProtocolException {
sender.disconnect(msg);
}
-
- @Override
- public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
- sender.assignPrimaryRole(msg);
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 516b67e..25041ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -27,7 +27,6 @@ import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.MulticastProtocolMessage;
import org.apache.nifi.cluster.protocol.message.PingMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
@@ -92,8 +91,4 @@ public class ObjectFactory {
public ControllerStartupFailureMessage createControllerStartupFailureMessage() {
return new ControllerStartupFailureMessage();
}
-
- public PrimaryRoleAssignmentMessage createPrimaryRoleAssignmentMessage() {
- return new PrimaryRoleAssignmentMessage();
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
deleted file mode 100644
index 4b7563a..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/PrimaryRoleAssignmentMessage.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.message;
-
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-
-/**
- */
-@XmlRootElement(name = "primaryRoleAssignmentMessage")
-public class PrimaryRoleAssignmentMessage extends ProtocolMessage {
-
- private NodeIdentifier nodeId;
-
- private boolean primary;
-
- @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
- public NodeIdentifier getNodeId() {
- return nodeId;
- }
-
- public void setNodeId(NodeIdentifier nodeId) {
- this.nodeId = nodeId;
- }
-
- public boolean isPrimary() {
- return primary;
- }
-
- public void setPrimary(boolean primary) {
- this.primary = primary;
- }
-
- @Override
- public MessageType getType() {
- return MessageType.PRIMARY_ROLE;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index f01efd8..5953e09 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -31,7 +31,6 @@ public abstract class ProtocolMessage {
FLOW_RESPONSE,
HEARTBEAT,
PING,
- PRIMARY_ROLE,
RECONNECTION_REQUEST,
RECONNECTION_RESPONSE,
SERVICE_BROADCAST,
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 51de54b..27ada88 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -23,9 +23,7 @@ import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
@@ -142,22 +140,6 @@ public interface ClusterManager extends NodeInformant {
List<Event> getNodeEvents(final String nodeId);
/**
- * Revokes the primary role from the current primary node and assigns the primary role to given given node ID.
- *
- * If role revocation fails, then the current primary node is set to disconnected while retaining the primary role and no role assignment is performed.
- *
- * If role assignment fails, then the given node is set to disconnected and is given the primary role.
- *
- * @param nodeId the node identifier
- * @param userDn the Distinguished Name of the user requesting that the Primary Node be assigned
- *
- * @throws UnknownNodeException if the node with the given identifier does not exist
- * @throws IneligiblePrimaryNodeException if the node with the given identifier is not eligible to be the primary node
- * @throws PrimaryRoleAssignmentException if the cluster was unable to change the primary role to the requested node
- */
- void setPrimaryNode(String nodeId, String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException;
-
- /**
* @return the primary node of the cluster or null if no primary node exists
*/
Node getPrimaryNode();
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 303e98e..6f1bc2c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -93,12 +93,10 @@ import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
@@ -118,7 +116,6 @@ import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
@@ -551,9 +548,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
servicesBroadcaster.start();
}
- // start in safe mode
- executeSafeModeTask();
-
// Load and start running Reporting Tasks
final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
@@ -1312,88 +1306,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- /**
- * Messages the node to have the primary role. If the messaging fails, then the node is marked as disconnected.
- *
- * @param nodeId the node ID to assign primary role
- *
- * @return true if primary role assigned; false otherwise
- */
- private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
- writeLock.lock();
- try {
- // create primary role message
- final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
- msg.setNodeId(nodeId);
- msg.setPrimary(true);
- logger.info("Attempting to assign primary role to node: " + nodeId);
-
- // message
- senderListener.assignPrimaryRole(msg);
-
- logger.info("Assigned primary role to node: " + nodeId);
- addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
-
- // true indicates primary role assigned
- return true;
-
- } catch (final ProtocolException ex) {
-
- logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex);
- addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex);
-
- // mark node as disconnected and log/record the event
- final Node node = getRawNode(nodeId.getId());
- node.setStatus(Status.DISCONNECTED);
- addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role.");
-
- addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role");
-
- // false indicates primary role failed to be assigned
- return false;
- } finally {
- writeLock.unlock("assignPrimaryRole");
- }
- }
-
- /**
- * Messages the node with the given node ID to no longer have the primary role. If the messaging fails, then the node is marked as disconnected.
- *
- * @return true if the primary role was revoked from the node; false otherwise
- */
- private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
- writeLock.lock();
- try {
- // create primary role message
- final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
- msg.setNodeId(nodeId);
- msg.setPrimary(false);
- logger.info("Attempting to revoke primary role from node: " + nodeId);
-
- // send message
- senderListener.assignPrimaryRole(msg);
-
- logger.info("Revoked primary role from node: " + nodeId);
- addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node");
-
- // true indicates primary role was revoked
- return true;
- } catch (final ProtocolException ex) {
-
- logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex);
-
- // mark node as disconnected and log/record the event
- final Node node = getRawNode(nodeId.getId());
- node.setStatus(Status.DISCONNECTED);
- addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role.");
- addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role");
-
- // false indicates primary role failed to be revoked
- return false;
- } finally {
- writeLock.unlock("revokePrimaryRole");
- }
- }
private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort(),
@@ -1778,12 +1690,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// get a raw reference to the node (if it doesn't exist, node will be null)
node = getRawNode(resolvedNodeIdentifier.getId());
- // if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role
- if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) {
- addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node.");
- revokePrimaryRole(resolvedNodeIdentifier);
- }
-
final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
@@ -1871,6 +1777,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// record heartbeat
node.setHeartbeat(mostRecentHeartbeat);
+
+ if (mostRecentHeartbeat.isPrimary()) {
+ setPrimaryNodeId(node.getNodeId());
+ }
}
} catch (final Exception e) {
logger.error("Failed to process heartbeat from {}:{} due to {}",
@@ -1984,47 +1894,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- @Override
- public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException {
- writeLock.lock();
- try {
-
- final Node node = getNode(nodeId);
- if (node == null) {
- throw new UnknownNodeException("Node does not exist.");
- } else if (Status.CONNECTED != node.getStatus()) {
- throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node.");
- }
-
- // revoke primary role
- final Node primaryNode;
- if ((primaryNode = getPrimaryNode()) != null) {
- if (primaryNode.getStatus() == Status.DISCONNECTED) {
- throw new PrimaryRoleAssignmentException("A disconnected, primary node exists. Delete the node before assigning the primary role to a different node.");
- } else if (revokePrimaryRole(primaryNode.getNodeId())) {
- addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment.");
- } else {
- throw new PrimaryRoleAssignmentException(
- "Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node.");
- }
- }
-
- // change the primary node ID to the given node
- setPrimaryNodeId(node.getNodeId());
-
- // assign primary role
- if (assignPrimaryRole(node.getNodeId())) {
- addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn);
- addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn);
- } else {
- throw new PrimaryRoleAssignmentException(
- "Cluster manager assigned primary role to node, but the node failed to accept the assignment. Cluster manager disconnected node.");
- }
- } finally {
- writeLock.unlock("setPrimaryNode");
- }
- }
-
private int getClusterProtocolHeartbeatSeconds() {
return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS);
}
@@ -4508,66 +4377,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- private void executeSafeModeTask() {
-
- new Thread(new Runnable() {
-
- private final long threadStartTime = System.currentTimeMillis();
-
- @Override
- public void run() {
- logger.info("Entering safe mode...");
- final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
- final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
- boolean exitSafeMode = false;
- while (isRunning()) {
-
- writeLock.lock();
- try {
-
- final long currentTime = System.currentTimeMillis();
- if (timeToElect < currentTime) {
- final Set<NodeIdentifier> connectedNodeIds = getNodeIds(Status.CONNECTED);
- if (!connectedNodeIds.isEmpty()) {
- // get first connected node ID
- final NodeIdentifier connectedNodeId = connectedNodeIds.iterator().next();
- if (assignPrimaryRole(connectedNodeId)) {
- try {
- setPrimaryNodeId(connectedNodeId);
- exitSafeMode = true;
- } catch (final DaoException de) {
- final String message = String.format("Failed to persist primary node ID '%s' in cluster dataflow.", connectedNodeId);
- logger.warn(message);
- addBulletin(connectedNodeId, Severity.WARNING, message);
- revokePrimaryRole(connectedNodeId);
- }
- }
- }
- }
-
- if (!isInSafeMode()) {
- // a primary node has been selected outside of this thread
- exitSafeMode = true;
- logger.info("Exiting safe mode because " + primaryNodeId + " has been assigned the primary role.");
- break;
- }
- } finally {
- writeLock.unlock("executeSafeModeTask");
- }
-
- if (!exitSafeMode) {
- // sleep for a bit
- try {
- Thread.sleep(1000);
- } catch (final InterruptedException ie) {
- return;
- }
- }
-
- }
- }
- }).start();
- }
/**
* This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index dc5b7d3..ff3ecba 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -131,6 +131,15 @@
<dependency>
<groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 632fa1a..9f14354 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -83,6 +83,9 @@ import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
+import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionManager;
+import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
@@ -231,6 +234,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String DEFAULT_ROOT_GROUP_NAME = "NiFi Flow";
+ public static final String PRIMARY_NODE_ROLE_NAME = "primary-node";
private final AtomicInteger maxTimerDrivenThreads;
private final AtomicInteger maxEventDrivenThreads;
@@ -277,6 +281,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private ProcessGroup rootGroup;
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
+ private final LeaderElectionManager leaderElectionManager;
+
/**
* true if controller is configured to operate in a clustered environment
@@ -329,12 +335,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// guarded by rwLock
/**
- * true if controller is the primary of the cluster
- */
- private boolean primary;
-
- // guarded by rwLock
- /**
* true if connected to a cluster
*/
private boolean connected;
@@ -527,6 +527,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+ if (configuredForClustering) {
+ leaderElectionManager = new CuratorLeaderElectionManager(4);
+ } else {
+ leaderElectionManager = null;
+ }
}
private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ResourceClaimManager contentClaimManager) {
@@ -1159,6 +1164,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalStateException("Controller already stopped or still stopping...");
}
+ if (leaderElectionManager != null) {
+ leaderElectionManager.stop();
+ }
+
if (kill) {
this.timerDrivenEngineRef.get().shutdownNow();
this.eventDrivenEngineRef.get().shutdownNow();
@@ -1365,7 +1374,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
writeLock.unlock();
}
@@ -3119,6 +3128,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the bulletin repository
if (isChanging) {
if (clustered) {
+ leaderElectionManager.register(PRIMARY_NODE_ROLE_NAME, new LeaderElectionStateChangeListener() {
+ @Override
+ public void onLeaderElection() {
+ setPrimary(true);
+ }
+
+ @Override
+ public void onLeaderRelinquish() {
+ setPrimary(false);
+ }
+ });
+
+ leaderElectionManager.start();
stateManagerProvider.enableClusterProvider();
if (zooKeeperStateServer != null) {
@@ -3157,6 +3179,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
}
} else {
+ leaderElectionManager.unregister(PRIMARY_NODE_ROLE_NAME);
+
if (zooKeeperStateServer != null) {
zooKeeperStateServer.shutdown();
}
@@ -3170,7 +3194,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
writeLock.unlock();
}
@@ -3180,51 +3204,38 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @return true if this instance is the primary node in the cluster; false otherwise
*/
public boolean isPrimary() {
- rwLock.readLock().lock();
- try {
- return primary;
- } finally {
- rwLock.readLock().unlock();
- }
+ return leaderElectionManager != null && leaderElectionManager.isLeader(PRIMARY_NODE_ROLE_NAME);
}
public void setPrimary(final boolean primary) {
- rwLock.writeLock().lock();
- try {
- // no update, so return
- if (this.primary == primary) {
- return;
- }
-
- LOG.info("Setting primary flag from '" + this.primary + "' to '" + primary + "'");
-
- final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
- final ProcessGroup rootGroup = getGroup(getRootGroupId());
- for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
- }
+ final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
+ final ProcessGroup rootGroup = getGroup(getRootGroupId());
+ for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
- for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
- }
+ }
+ for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
- for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
- try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
- }
+ }
+ for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
+ }
- // update primary
- this.primary = primary;
- eventDrivenWorkerQueue.setPrimary(primary);
+ // update primary
+ eventDrivenWorkerQueue.setPrimary(primary);
- // update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
- } finally {
- rwLock.writeLock().unlock();
- }
+ // update the heartbeat bean
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+
+ // Emit a bulletin detailing the fact that the primary node state has changed
+ final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
+ final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
+ bulletinRepository.addBulletin(bulletin);
}
static boolean areEqual(final String a, final String b) {
@@ -3603,7 +3614,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connected = connected;
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
} finally {
rwLock.writeLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 6250c5a..67d0338 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -55,7 +55,6 @@ import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -342,7 +341,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
case RECONNECTION_REQUEST:
case DISCONNECTION_REQUEST:
case FLOW_REQUEST:
- case PRIMARY_ROLE:
return true;
default:
return false;
@@ -381,14 +379,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}, "Disconnect from Cluster").start();
return null;
- case PRIMARY_ROLE:
- new Thread(new Runnable() {
- @Override
- public void run() {
- handlePrimaryRoleAssignment((PrimaryRoleAssignmentMessage) request);
- }
- }, "Set Primary Role Status").start();
- return null;
default:
throw new ProtocolException("Handler cannot handle message type: " + request.getType());
}
@@ -512,14 +502,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
- private void handlePrimaryRoleAssignment(final PrimaryRoleAssignmentMessage msg) {
- writeLock.lock();
- try {
- controller.setPrimary(msg.isPrimary());
- } finally {
- writeLock.unlock();
- }
- }
private void handleReconnectionRequest(final ReconnectionRequestMessage request) {
writeLock.lock();
@@ -747,9 +729,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setConnected(true);
- // set primary
- controller.setPrimary(response.isPrimary());
-
// start the processors as indicated by the dataflow
controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
@@ -862,7 +841,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
private class SaveHolder {
-
private final Calendar saveTime;
private final boolean shouldArchive;
@@ -871,22 +849,4 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
shouldArchive = archive;
}
}
-
- public boolean isPrimary() {
- readLock.lock();
- try {
- return controller.isPrimary();
- } finally {
- readLock.unlock();
- }
- }
-
- public void setPrimary(boolean primary) {
- writeLock.lock();
- try {
- controller.setPrimary(primary);
- } finally {
- writeLock.unlock();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
new file mode 100644
index 0000000..4c0cbd0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.common.PathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CuratorLeaderElectionManager implements LeaderElectionManager {
+ private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
+
+ private final FlowEngine leaderElectionMonitorEngine;
+ private final int sessionTimeoutMs;
+ private final int connectionTimeoutMs;
+ private final String rootPath;
+ private final String connectString;
+
+ private CuratorFramework curatorClient;
+
+ private volatile boolean stopped = true;
+
+ private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
+ private final Map<String, LeaderElectionStateChangeListener> registeredRoles = new HashMap<>();
+
+ public CuratorLeaderElectionManager(final int threadPoolSize) {
+ leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
+
+ final NiFiProperties properties = NiFiProperties.getInstance();
+
+ connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+ if (connectString == null || connectString.trim().isEmpty()) {
+ throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
+ }
+
+ sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+ connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
+ rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
+
+ try {
+ PathUtils.validatePath(rootPath);
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
+ }
+ }
+
+
+ @Override
+ public synchronized void start() {
+ if (!stopped) {
+ return;
+ }
+
+ stopped = false;
+
+ final RetryPolicy retryPolicy = new RetryForever(5000);
+ curatorClient = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
+ curatorClient.start();
+
+ // Call #register for each already-registered role. This will
+ // cause us to start listening for leader elections for that
+ // role again
+ for (final Map.Entry<String, LeaderElectionStateChangeListener> entry : registeredRoles.entrySet()) {
+ register(entry.getKey(), entry.getValue());
+ }
+
+ logger.info("{} started", this);
+ }
+
+ private int getTimePeriod(final NiFiProperties properties, final String propertyName, final String defaultValue) {
+ final String timeout = properties.getProperty(propertyName, defaultValue);
+ try {
+ return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
+ return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+ }
+ }
+
+
+ @Override
+ public synchronized void register(final String roleName) {
+ register(roleName, null);
+ }
+
+
+ @Override
+ public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener) {
+ logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
+
+ if (leaderRoles.containsKey(roleName)) {
+ logger.warn("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
+ return;
+ }
+
+ final String leaderPath = (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
+
+ try {
+ PathUtils.validatePath(rootPath);
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
+ }
+
+ registeredRoles.put(roleName, listener);
+
+ if (!isStopped()) {
+ final ElectionListener electionListener = new ElectionListener(roleName, listener);
+ final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
+ leaderSelector.autoRequeue();
+ leaderSelector.start();
+
+ final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener);
+
+ leaderRoles.put(roleName, leaderRole);
+ }
+ logger.info("{} Registered new Leader Selector for role {}", this, roleName);
+ }
+
+ @Override
+ public synchronized void unregister(final String roleName) {
+ registeredRoles.remove(roleName);
+
+ final LeaderRole leaderRole = leaderRoles.remove(roleName);
+ final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
+ if (leaderSelector == null) {
+ logger.warn("Cannot unregister Leader Election Role '{}' becuase that role is not registered", roleName);
+ return;
+ }
+
+ leaderSelector.close();
+ }
+
+ @Override
+ public synchronized void stop() {
+ stopped = true;
+
+ for (final LeaderRole role : leaderRoles.values()) {
+ final LeaderSelector selector = role.getLeaderSelector();
+ selector.close();
+ }
+
+ leaderRoles.clear();
+
+ if (curatorClient != null) {
+ curatorClient.close();
+ curatorClient = null;
+ }
+
+ logger.info("{} stopped and closed", this);
+ }
+
+ @Override
+ public boolean isStopped() {
+ return stopped;
+ }
+
+
+ @Override
+ public String toString() {
+ return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
+ }
+
+
+ @Override
+ public synchronized boolean isLeader(final String roleName) {
+ final LeaderRole role = leaderRoles.get(roleName);
+ if (role == null) {
+ return false;
+ }
+
+ return role.isLeader();
+ }
+
+
+ private static class LeaderRole {
+ private final LeaderSelector leaderSelector;
+ private final ElectionListener electionListener;
+
+ public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener) {
+ this.leaderSelector = leaderSelector;
+ this.electionListener = electionListener;
+ }
+
+ public LeaderSelector getLeaderSelector() {
+ return leaderSelector;
+ }
+
+ public boolean isLeader() {
+ return electionListener.isLeader();
+ }
+ }
+
+
+ private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
+ private final String roleName;
+ private final LeaderElectionStateChangeListener listener;
+
+ private volatile boolean leader;
+
+ public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener) {
+ this.roleName = roleName;
+ this.listener = listener;
+ }
+
+ public boolean isLeader() {
+ return leader;
+ }
+
+ @Override
+ public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
+ logger.info("{} Connection State changed to {}", this, newState.name());
+ super.stateChanged(client, newState);
+ }
+
+ @Override
+ public void takeLeadership(final CuratorFramework client) throws Exception {
+ leader = true;
+ logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
+
+ if (listener != null) {
+ leaderElectionMonitorEngine.submit(new Runnable() {
+ @Override
+ public void run() {
+ listener.onLeaderElection();
+ }
+ });
+ }
+
+ // Curator API states that we lose the leadership election when we return from this method,
+ // so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
+ try {
+ while (!isStopped()) {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ } finally {
+ leader = false;
+ logger.info("{} This node is no longer leader for role '{}'", this, roleName);
+
+ if (listener != null) {
+ leaderElectionMonitorEngine.submit(new Runnable() {
+ @Override
+ public void run() {
+ listener.onLeaderRelinquish();
+ }
+ });
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
new file mode 100644
index 0000000..d16dbdb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionManager.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+public interface LeaderElectionManager {
+ /**
+ * Starts managing leader elections for all registered roles
+ */
+ void start();
+
+ /**
+ * Adds a new role for which a leader is required
+ *
+ * @param roleName the name of the role
+ */
+ void register(String roleName);
+
+ /**
+ * Adds a new role for which a leader is required
+ *
+ * @param roleName the name of the role
+ * @param listener a listener that will be called when the node gains or relinquishes
+ * the role of leader
+ */
+ void register(String roleName, LeaderElectionStateChangeListener listener);
+
+ /**
+ * Removes the role with the given name from this manager. If this
+ * node is the elected leader for the given role, this node will relinquish
+ * the leadership role
+ *
+ * @param roleName the name of the role to unregister
+ */
+ void unregister(String roleName);
+
+ /**
+ * Returns a boolean value indicating whether or not this node
+ * is the elected leader for the given role
+ *
+ * @param roleName the name of the role
+ * @return <code>true</code> if the node is the elected leader, <code>false</code> otherwise.
+ */
+ boolean isLeader(String roleName);
+
+ /**
+ * @return <code>true</code> if the manager is stopped, false otherwise.
+ */
+ boolean isStopped();
+
+ /**
+ * Stops managing leader elections and relinquishes the role as leader
+ * for all registered roles. If the LeaderElectionManager is later started
+ * again, all previously registered roles will still be registered.
+ */
+ void stop();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
new file mode 100644
index 0000000..79c7a75
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/LeaderElectionStateChangeListener.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.leader.election;
+
+/**
+ * Callback interface that can be used to listen for state changes so that the node
+ * can be notified when it becomes the Elected Leader for a role or is no longer the
+ * Elected Leader
+ */
+public interface LeaderElectionStateChangeListener {
+ /**
+ * This method is invoked whenever this node is elected leader
+ */
+ void onLeaderElection();
+
+ /**
+ * This method is invoked whenever this node no longer is the elected leader.
+ */
+ void onLeaderRelinquish();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 24d2295..beb71c1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -168,6 +168,12 @@ nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
nifi.cluster.node.unicast.manager.address=${nifi.cluster.node.unicast.manager.address}
nifi.cluster.node.unicast.manager.protocol.port=${nifi.cluster.node.unicast.manager.protocol.port}
+# zookeeper properties, used for cluster management #
+nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
+nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
+nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
+nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
+
# cluster manager properties (only configure for cluster manager) #
nifi.cluster.is.manager=${nifi.cluster.is.manager}
nifi.cluster.manager.address=${nifi.cluster.manager.address}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b981bde..4fdda06 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -760,12 +760,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
clusterManager.requestReconnection(nodeDTO.getNodeId(), userDn);
} else if (Node.Status.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
clusterManager.requestDisconnection(nodeDTO.getNodeId(), userDn);
- } else {
- // handle primary
- final Boolean primary = nodeDTO.isPrimary();
- if (primary != null && primary) {
- clusterManager.setPrimaryNode(nodeDTO.getNodeId(), userDn);
- }
}
final String nodeId = nodeDTO.getNodeId();
http://git-wip-us.apache.org/repos/asf/nifi/blob/1ac05266/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dc764b0..442acdd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -765,6 +765,17 @@ language governing permissions and limitations under the License. -->
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.10.0</version>
+ </dependency>
+
<!-- Test Dependencies for testing interactions with ZooKeeper -->
<dependency>
@@ -779,6 +790,8 @@ language governing permissions and limitations under the License. -->
<version>6.8.8</version>
<scope>test</scope>
</dependency>
+
+
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>