You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:48 UTC
[2/4] nifi git commit: NIFI-1678: Started refactoring heartbeating
mechanism, using a new package: org.apache.nifi.cluster.coordination
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
new file mode 100644
index 0000000..fc75601
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManagerCoordinator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
+import org.apache.nifi.cluster.protocol.ConnectionRequest;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WebClusterManagerCoordinator implements ClusterCoordinator {
+ private static final Logger logger = LoggerFactory.getLogger(WebClusterManagerCoordinator.class);
+ private static final AtomicLong nodeStatusIdGenerator = new AtomicLong(0L);
+
+ private final WebClusterManager clusterManager;
+ private final ClusterManagerProtocolSender protocolSender;
+
+ public WebClusterManagerCoordinator(final WebClusterManager clusterManager, final ClusterManagerProtocolSender protocolSender) {
+ this.clusterManager = clusterManager;
+ this.protocolSender = protocolSender;
+ }
+
+ @Override
+ public void requestNodeConnect(final NodeIdentifier nodeId) {
+ final Node node = clusterManager.getRawNode(nodeId.getId());
+
+ if (node == null) {
+ final ConnectionRequest connectionRequest = new ConnectionRequest(nodeId);
+ clusterManager.requestConnection(connectionRequest);
+ } else {
+ updateNodeStatus(nodeId, new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED, "Requesting that Node Connect to the Cluster"));
+ clusterManager.requestReconnection(nodeId.getId(), "Anonymous");
+ }
+ }
+
+ @Override
+ public void finishNodeConnection(final NodeIdentifier nodeId) {
+ final boolean updated = updateNodeStatus(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+ if (!updated) {
+ logger.error("Attempting to Finish Node Connection but could not find Node with Identifier {}", nodeId);
+ }
+ }
+
+ @Override
+ public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ try {
+ clusterManager.requestDisconnection(nodeId, false, explanation);
+
+ if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+ final Node node = clusterManager.getRawNode(nodeId.getId());
+ if (node != null) {
+ updateNodeStatus(node, Status.DISCONNECTED, true);
+ }
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to request node {} disconnect from cluster due to {}", nodeId, e);
+ logger.error("", e);
+ }
+ }
+
+ @Override
+ public void disconnectionRequestedByNode(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ updateNodeStatus(nodeId, new NodeConnectionStatus(disconnectionCode, explanation));
+
+ final Severity severity;
+ switch (disconnectionCode) {
+ case STARTUP_FAILURE:
+ case MISMATCHED_FLOWS:
+ case UNKNOWN:
+ severity = Severity.ERROR;
+ break;
+ default:
+ severity = Severity.INFO;
+ break;
+ }
+
+ reportEvent(nodeId, severity, "Node disconnected from cluster due to " + explanation);
+ }
+
+ @Override
+ public NodeConnectionStatus getConnectionStatus(final NodeIdentifier nodeId) {
+ final Node node = clusterManager.getNode(nodeId.getId());
+ if (node == null) {
+ return null;
+ }
+
+ final Status status = node.getStatus();
+ final NodeConnectionState connectionState = NodeConnectionState.valueOf(status.name());
+ return new NodeConnectionStatus(connectionState, node.getConnectionRequestedTimestamp());
+ }
+
+ @Override
+ public Set<NodeIdentifier> getNodeIdentifiers(final NodeConnectionState state) {
+ final Status status = Status.valueOf(state.name());
+ final Set<Node> nodes = clusterManager.getNodes(status);
+ return nodes.stream()
+ .map(node -> node.getNodeId())
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public boolean isBlockedByFirewall(final String hostname) {
+ return clusterManager.isBlockedByFirewall(hostname);
+ }
+
+ @Override
+ public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String event) {
+ final String messagePrefix = nodeId == null ? "" : nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- ";
+ switch (severity) {
+ case INFO:
+ logger.info(messagePrefix + event);
+ break;
+ case WARNING:
+ logger.warn(messagePrefix + event);
+ break;
+ case ERROR:
+ logger.error(messagePrefix + event);
+ break;
+ }
+
+ clusterManager.reportEvent(nodeId, severity, messagePrefix + event);
+ }
+
+ @Override
+ public void setPrimaryNode(final NodeIdentifier nodeId) {
+ clusterManager.setPrimaryNodeId(nodeId);
+ }
+
+ @Override
+ public NodeIdentifier getNodeIdentifier(final String uuid) {
+ final Node node = clusterManager.getNode(uuid);
+ return node == null ? null : node.getNodeId();
+ }
+
+
+ /**
+ * Updates the status of the node with the given ID to the given status and returns <code>true</code>
+ * if successful, <code>false</code> if no node exists with the given ID
+ *
+ * @param nodeId the ID of the node whose status is changed
+ * @param status the new status of the node
+ * @return <code>true</code> if the node exists and is updated, <code>false</code> if the node does not exist
+ */
+ private boolean updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus status) {
+ final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+
+ final Node node = clusterManager.getRawNode(nodeId.getId());
+ if (node == null) {
+ return false;
+ }
+
+ final Status nodeStatus = Status.valueOf(status.getState().name());
+ final Status oldStatus = node.setStatus(nodeStatus);
+
+ if (nodeStatus != oldStatus) {
+ final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+ .map(toNotify -> toNotify.getNodeId())
+ .collect(Collectors.toSet());
+
+ final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+ message.setNodeId(nodeId);
+ message.setNodeConnectionStatus(status);
+ // TODO: When this is sent from one node to another, we need to ensure that we check the current
+ // 'revision number' on the node and include that as the Update ID because we need a way to indicate
+ // which status change event occurred first. I.e., when the status of a node is updated on any node
+ // that is not the elected leader, we need to ensure that our nodeStatusIdGenerator also is updated.
+ message.setStatusUpdateIdentifier(statusUpdateId);
+
+ protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+ }
+
+ return true;
+ }
+
+ /**
+ * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+ * some of the status changing and we want it to call into this coordinator instead to change the status.
+ *
+ * @param rawNode the node whose status should be updated
+ * @param nodeStatus the new status of the node
+ */
+ void updateNodeStatus(final Node rawNode, final Status nodeStatus) {
+ // TODO: Remove this method when NCM is removed
+ updateNodeStatus(rawNode, nodeStatus, false);
+ }
+
+
+ /**
+ * Updates the status of the given node to the given new status. This method exists only because the NCM currently handles
+ * some of the status changing and we want it to call into this coordinator instead to change the status.
+ *
+ * @param rawNode the node whose status should be updated
+ * @param nodeStatus the new status of the node
+ * @param heartbeatDisconnect indicates whether or not the node is being disconnected due to lack of heartbeat
+ */
+ void updateNodeStatus(final Node rawNode, final Status nodeStatus, final boolean heartbeatDisconnect) {
+ // TODO: Remove this method when NCM is removed.
+ final long statusUpdateId = nodeStatusIdGenerator.incrementAndGet();
+ final Status oldStatus;
+ if (heartbeatDisconnect) {
+ oldStatus = rawNode.setHeartbeatDisconnection();
+ } else {
+ oldStatus = rawNode.setStatus(nodeStatus);
+ }
+
+ if (nodeStatus != oldStatus) {
+ final Set<NodeIdentifier> nodesToNotify = clusterManager.getNodes(Status.CONNECTED, Status.CONNECTING).stream()
+ .map(toNotify -> toNotify.getNodeId())
+ .collect(Collectors.toSet());
+
+ final NodeStatusChangeMessage message = new NodeStatusChangeMessage();
+ message.setNodeId(rawNode.getNodeId());
+ message.setNodeConnectionStatus(new NodeConnectionStatus(NodeConnectionState.valueOf(nodeStatus.name())));
+ message.setStatusUpdateIdentifier(statusUpdateId);
+
+ protocolSender.notifyNodeStatusChange(nodesToNotify, message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
index 3bb3c1a..6f71834 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/node/Node.java
@@ -18,12 +18,7 @@ package org.apache.nifi.cluster.node;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Represents a connected flow controller. Nodes always have an immutable identifier and a status. The status may be changed, but never null.
@@ -35,8 +30,6 @@ import org.slf4j.LoggerFactory;
*/
public class Node implements Cloneable, Comparable<Node> {
- private static final Logger lockLogger = LoggerFactory.getLogger("cluster.lock");
-
/**
* The semantics of a Node status are as follows:
* <ul>
@@ -66,15 +59,6 @@ public class Node implements Cloneable, Comparable<Node> {
*/
private Status status;
- /**
- * the last heartbeat received by from the node
- */
- private Heartbeat lastHeartbeat;
-
- /**
- * the payload of the last heartbeat received from the node
- */
- private HeartbeatPayload lastHeartbeatPayload;
/**
* the last time the connection for this node was requested
@@ -101,40 +85,6 @@ public class Node implements Cloneable, Comparable<Node> {
}
/**
- * Returns the last received heartbeat or null if no heartbeat has been set.
- *
- * @return a heartbeat or null
- */
- public Heartbeat getHeartbeat() {
- return lastHeartbeat;
- }
-
- public HeartbeatPayload getHeartbeatPayload() {
- return lastHeartbeatPayload;
- }
-
- /**
- * Sets the last heartbeat received.
- *
- * @param heartbeat a heartbeat
- *
- * @throws ProtocolException if the heartbeat's payload failed unmarshalling
- */
- public void setHeartbeat(final Heartbeat heartbeat) throws ProtocolException {
- this.lastHeartbeat = heartbeat;
- if (this.lastHeartbeat == null) {
- this.lastHeartbeatPayload = null;
- } else {
- final byte[] payload = lastHeartbeat.getPayload();
- if (payload == null || payload.length == 0) {
- this.lastHeartbeatPayload = null;
- } else {
- this.lastHeartbeatPayload = HeartbeatPayload.unmarshal(payload);
- }
- }
- }
-
- /**
* Returns the time of the last received connection request for this node.
*
* @return the time when the connection request for this node was received.
@@ -166,34 +116,38 @@ public class Node implements Cloneable, Comparable<Node> {
/**
* Sets the status to disconnected and flags the node as being disconnected by lack of heartbeat.
*/
- public void setHeartbeatDisconnection() {
- setStatus(Status.DISCONNECTED);
+ public Status setHeartbeatDisconnection() {
+ final Status oldStatus = setStatus(Status.DISCONNECTED);
heartbeatDisconnection = true;
+ return oldStatus;
}
/**
* @return the status
*/
- public Status getStatus() {
+ public synchronized Status getStatus() {
return status;
}
/**
+ * Updates the status to the given value, returning the previous status
+ *
* @param status a status
+ * @return the previous status for the node
*/
- public void setStatus(final Status status) {
+ public synchronized Status setStatus(final Status status) {
if (status == null) {
throw new IllegalArgumentException("Status may not be null.");
}
+ final Status oldStatus = this.status;
this.status = status;
heartbeatDisconnection = false;
+ return oldStatus;
}
@Override
public Node clone() {
final Node clone = new Node(nodeId, status);
- clone.lastHeartbeat = lastHeartbeat;
- clone.lastHeartbeatPayload = lastHeartbeatPayload;
clone.heartbeatDisconnection = heartbeatDisconnection;
clone.connectionRequestedTimestamp = connectionRequestedTimestamp;
return clone;
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..e218e05
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAbstractHeartbeatMonitor {
+ private TestingServer zkServer;
+ private NodeIdentifier nodeId;
+ private TestFriendlyHeartbeatMonitor monitor;
+
+ @Before
+ public void setup() throws Exception {
+ zkServer = new TestingServer(true);
+ zkServer.start();
+ nodeId = new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 9999, "localhost", 8888, "localhost", null, false);
+ }
+
+ @After
+ public void clear() throws IOException {
+ if (zkServer != null) {
+ zkServer.stop();
+ zkServer.close();
+ }
+
+ if (monitor != null) {
+ monitor.stop();
+ }
+ }
+
+ /**
+ * Verifies that a node that sends a heartbeat that indicates that it is 'connected' is asked to connect to
+ * cluster if the cluster coordinator does not know about the node
+ *
+ * @throws InterruptedException if interrupted
+ */
+ @Test
+ public void testNewConnectedHeartbeatFromUnknownNode() throws IOException, InterruptedException {
+ final List<NodeIdentifier> requestedToConnect = Collections.synchronizedList(new ArrayList<>());
+ final ClusterCoordinatorAdapter coordinator = new ClusterCoordinatorAdapter() {
+ @Override
+ public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+ requestedToConnect.add(nodeId);
+ }
+ };
+
+ final TestFriendlyHeartbeatMonitor monitor = createMonitor(coordinator);
+
+ // Ensure that we request the Unknown Node connect to the cluster
+ final NodeHeartbeat heartbeat = createHeartbeat(nodeId, NodeConnectionState.CONNECTED);
+ monitor.addHeartbeat(heartbeat);
+ monitor.waitForProcessed();
+
+ assertEquals(1, requestedToConnect.size());
+ assertEquals(nodeId, requestedToConnect.get(0));
+ assertEquals(1, coordinator.getEvents().size());
+ }
+
+ /**
+ * Verifies that a node that sends a heartbeat that indicates that it is 'connected' if previously
+ * manually disconnected, will be asked to disconnect from the cluster again.
+ *
+ * @throws InterruptedException if interrupted
+ */
+ @Test
+ public void testHeartbeatFromManuallyDisconnectedNode() throws InterruptedException {
+ final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+ final Set<NodeIdentifier> requestedToDisconnect = Collections.synchronizedSet(new HashSet<>());
+ final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+ @Override
+ public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+ super.requestNodeConnect(nodeId);
+ requestedToConnect.add(nodeId);
+ }
+
+ @Override
+ public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+ requestedToDisconnect.add(nodeId);
+ }
+ };
+
+ final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+ adapter.requestNodeDisconnect(nodeId, DisconnectionCode.USER_DISCONNECTED, "Unit Testing");
+ monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+ monitor.waitForProcessed();
+
+ assertEquals(1, requestedToDisconnect.size());
+ assertEquals(nodeId, requestedToDisconnect.iterator().next());
+ assertTrue(requestedToConnect.isEmpty());
+ }
+
+
+ @Test
+ public void testConnectingNodeMarkedConnectedWhenHeartbeatReceived() throws InterruptedException {
+ final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+ final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+ final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+ @Override
+ public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+ super.requestNodeConnect(nodeId);
+ requestedToConnect.add(nodeId);
+ }
+
+ @Override
+ public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+ super.finishNodeConnection(nodeId);
+ connected.add(nodeId);
+ }
+ };
+
+ final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+ adapter.requestNodeConnect(nodeId); // set state to 'connecting'
+ requestedToConnect.clear();
+
+ monitor.addHeartbeat(createHeartbeat(nodeId, NodeConnectionState.CONNECTED));
+ monitor.waitForProcessed();
+
+ assertEquals(1, connected.size());
+ assertEquals(nodeId, connected.iterator().next());
+ assertTrue(requestedToConnect.isEmpty());
+ }
+
+
+ @Test
+ public void testDisconnectedHeartbeatOnStartup() throws InterruptedException {
+ final Set<NodeIdentifier> requestedToConnect = Collections.synchronizedSet(new HashSet<>());
+ final Set<NodeIdentifier> connected = Collections.synchronizedSet(new HashSet<>());
+ final Set<NodeIdentifier> disconnected = Collections.synchronizedSet(new HashSet<>());
+ final ClusterCoordinatorAdapter adapter = new ClusterCoordinatorAdapter() {
+ @Override
+ public synchronized void requestNodeConnect(final NodeIdentifier nodeId) {
+ super.requestNodeConnect(nodeId);
+ requestedToConnect.add(nodeId);
+ }
+
+ @Override
+ public synchronized void finishNodeConnection(final NodeIdentifier nodeId) {
+ super.finishNodeConnection(nodeId);
+ connected.add(nodeId);
+ }
+
+ @Override
+ public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
+ disconnected.add(nodeId);
+ }
+ };
+
+ final TestFriendlyHeartbeatMonitor monitor = createMonitor(adapter);
+
+ requestedToConnect.clear();
+
+ monitor.addHeartbeat(createHeartbeat(nodeId, DisconnectionCode.NODE_SHUTDOWN));
+ monitor.waitForProcessed();
+
+ assertTrue(connected.isEmpty());
+ assertTrue(requestedToConnect.isEmpty());
+ assertTrue(disconnected.isEmpty());
+ }
+
+ private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode) {
+ final NodeConnectionStatus status = new NodeConnectionStatus(disconnectionCode);
+ return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+ }
+
+ private NodeHeartbeat createHeartbeat(final NodeIdentifier nodeId, final NodeConnectionState state) {
+ final NodeConnectionStatus status = new NodeConnectionStatus(state);
+ return new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(), status, false, 0, 0, 0, 0);
+ }
+
+ private TestFriendlyHeartbeatMonitor createMonitor(final ClusterCoordinator coordinator) {
+ monitor = new TestFriendlyHeartbeatMonitor(coordinator, createProperties());
+ monitor.start();
+ return monitor;
+ }
+
+ private Properties createProperties() {
+ final Properties properties = new Properties();
+ properties.setProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL, "10 ms");
+ properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, zkServer.getConnectString());
+ properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, "3 secs");
+ properties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, "3 secs");
+ properties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, "/nifi");
+ return properties;
+ }
+
+ private static class ClusterCoordinatorAdapter implements ClusterCoordinator {
+ private final Map<NodeIdentifier, NodeConnectionStatus> statuses = new HashMap<>();
+ private final List<ReportedEvent> events = new ArrayList<>();
+
+ @Override
+ public synchronized void requestNodeConnect(NodeIdentifier nodeId) {
+ statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTING));
+ }
+
+ @Override
+ public synchronized void finishNodeConnection(NodeIdentifier nodeId) {
+ statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.CONNECTED));
+ }
+
+ @Override
+ public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+ statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+ }
+
+ @Override
+ public synchronized void disconnectionRequestedByNode(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+ statuses.put(nodeId, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED));
+ }
+
+ @Override
+ public synchronized NodeConnectionStatus getConnectionStatus(NodeIdentifier nodeId) {
+ return statuses.get(nodeId);
+ }
+
+ @Override
+ public synchronized Set<NodeIdentifier> getNodeIdentifiers(NodeConnectionState state) {
+ return statuses.entrySet().stream().filter(p -> p.getValue().getState() == state).map(p -> p.getKey()).collect(Collectors.toSet());
+ }
+
+ @Override
+ public synchronized boolean isBlockedByFirewall(String hostname) {
+ return false;
+ }
+
+ @Override
+ public synchronized void reportEvent(NodeIdentifier nodeId, Severity severity, String event) {
+ events.add(new ReportedEvent(nodeId, severity, event));
+ }
+
+ @Override
+ public synchronized void setPrimaryNode(NodeIdentifier nodeId) {
+ }
+
+ synchronized List<ReportedEvent> getEvents() {
+ return new ArrayList<>(events);
+ }
+
+ @Override
+ public NodeIdentifier getNodeIdentifier(final String uuid) {
+ return statuses.keySet().stream().filter(p -> p.getId().equals(uuid)).findFirst().orElse(null);
+ }
+ }
+
+ public static class ReportedEvent {
+ private final NodeIdentifier nodeId;
+ private final Severity severity;
+ private final String event;
+
+ public ReportedEvent(NodeIdentifier nodeId, Severity severity, String event) {
+ this.nodeId = nodeId;
+ this.severity = severity;
+ this.event = event;
+ }
+
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public Severity getSeverity() {
+ return severity;
+ }
+
+ public String getEvent() {
+ return event;
+ }
+ }
+
+
+ private static class TestFriendlyHeartbeatMonitor extends AbstractHeartbeatMonitor {
+ private Map<NodeIdentifier, NodeHeartbeat> heartbeats = new HashMap<>();
+ private final Object mutex = new Object();
+
+ public TestFriendlyHeartbeatMonitor(ClusterCoordinator clusterCoordinator, Properties properties) {
+ super(clusterCoordinator, properties);
+ }
+
+ @Override
+ protected synchronized Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+ return heartbeats;
+ }
+
+ @Override
+ public synchronized void monitorHeartbeats() {
+ super.monitorHeartbeats();
+
+ synchronized (mutex) {
+ mutex.notify();
+ }
+ }
+
+ synchronized void addHeartbeat(final NodeHeartbeat heartbeat) {
+ heartbeats.put(heartbeat.getNodeIdentifier(), heartbeat);
+ }
+
+ @Override
+ public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+ heartbeats.remove(nodeId);
+ }
+
+ void waitForProcessed() throws InterruptedException {
+ synchronized (mutex) {
+ mutex.wait();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
index 1d42729..86057ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/ConnectionException.java
@@ -24,6 +24,10 @@ public class ConnectionException extends RuntimeException {
private static final long serialVersionUID = -1378294897231234028L;
+ public ConnectionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
public ConnectionException() {
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 75395b7..13a01bb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -38,6 +38,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -61,6 +62,9 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -78,6 +82,8 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.connectable.StandardConnection;
+import org.apache.nifi.controller.cluster.Heartbeater;
+import org.apache.nifi.controller.cluster.ZooKeeperHeartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
@@ -299,10 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
private final StringEncryptor encryptor;
- /**
- * cluster protocol sender
- */
- private final NodeProtocolSender protocolSender;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
@@ -311,14 +313,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* timer to periodically send heartbeats to the cluster
*/
- private ScheduledFuture<?> heartbeatGeneratorFuture;
private ScheduledFuture<?> heartbeatSenderFuture;
+ private final Heartbeater heartbeater;
// guarded by FlowController lock
/**
* timer task to generate heartbeats
*/
- private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
+ private final AtomicReference<HeartbeatSendTask> heartbeatSendTask = new AtomicReference<>(null);
// guarded by rwLock
/**
@@ -334,10 +336,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private String clusterManagerDN;
// guarded by rwLock
- /**
- * true if connected to a cluster
- */
- private boolean connected;
+ private NodeConnectionStatus connectionStatus;
+ private final ConcurrentMap<NodeIdentifier, VersionedNodeConnectionStatus> nodeStatuses = new ConcurrentHashMap<>();
// guarded by rwLock
private String instanceId;
@@ -471,7 +471,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.configuredForClustering = configuredForClustering;
this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(properties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
- this.protocolSender = protocolSender;
try {
this.templateManager = new TemplateManager(properties.getTemplateDirectory());
} catch (final IOException e) {
@@ -508,9 +507,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Initialize the Embedded ZooKeeper server, if applicable
- if (properties.isStartEmbeddedZooKeeper()) {
+ if (properties.isStartEmbeddedZooKeeper() && configuredForClustering) {
try {
zooKeeperStateServer = ZooKeeperStateServer.create(properties);
+ zooKeeperStateServer.start();
} catch (final IOException | ConfigException e) {
throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
}
@@ -526,11 +526,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, false));
+ heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED)));
+
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4);
+ heartbeater = new ZooKeeperHeartbeater(protocolSender, properties);
} else {
leaderElectionManager = null;
+ heartbeater = null;
}
}
@@ -1003,6 +1006,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ public Heartbeater getHeartbeater() {
+ return heartbeater;
+ }
+
/**
* @return the BulletinRepository for storing and retrieving Bulletins
*/
@@ -1164,6 +1171,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalStateException("Controller already stopped or still stopping...");
}
+ if (heartbeater != null) {
+ sendShutdownNotification();
+ }
+
if (leaderElectionManager != null) {
leaderElectionManager.stop();
}
@@ -1253,6 +1264,39 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
/**
+ * Sends a notification to the cluster that the node was shut down.
+ */
+ private void sendShutdownNotification() {
+ // Generate a heartbeat message and publish it, indicating that we are shutting down
+ final HeartbeatMessage heartbeatMsg = createHeartbeatMessage();
+ final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+ final byte[] hbPayload = heartbeatMsg.getHeartbeat().getPayload();
+ final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NODE_SHUTDOWN);
+ heartbeatMsg.setHeartbeat(new Heartbeat(heartbeat.getNodeIdentifier(), false, connectionStatus, hbPayload));
+ final Runnable sendNotification = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ heartbeater.send(heartbeatMsg);
+ } catch (IOException e) {
+ LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message. Cluster may not be notified of "
+ + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+ }
+ }
+ };
+
+ final Future<?> hbFuture = processScheduler.submitFrameworkTask(sendNotification);
+ try {
+ hbFuture.get(3, TimeUnit.SECONDS);
+ LOG.info("Successfully sent Shutdown Notification to cluster");
+ } catch (final Exception e) {
+ LOG.warn("Failed to send NODE_SHUTDOWN heartbeat message in time. Cluster may not be notified of "
+ + "shutdown and may have to wait for the heartbeats to time out before noticing that the node left the cluster");
+ }
+ }
+
+
+ /**
* Serializes the current state of the controller to the given OutputStream
*
* @param serializer serializer
@@ -1374,7 +1418,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
} finally {
writeLock.unlock();
}
@@ -2951,14 +2995,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
-
stopHeartbeating();
- final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask();
- heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
- heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
-
- heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(new HeartbeatSendTask(protocolSender), 250, 250, TimeUnit.MILLISECONDS);
+ final HeartbeatSendTask sendTask = new HeartbeatSendTask();
+ this.heartbeatSendTask.set(sendTask);
+ heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
} finally {
writeLock.unlock();
}
@@ -2986,7 +3027,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
* @throws IllegalStateException if not clustered
*/
public void stopHeartbeating() throws IllegalStateException {
-
if (!configuredForClustering) {
throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
}
@@ -2997,10 +3037,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return;
}
- if (heartbeatGeneratorFuture != null) {
- heartbeatGeneratorFuture.cancel(false);
- }
-
if (heartbeatSenderFuture != null) {
heartbeatSenderFuture.cancel(false);
}
@@ -3016,8 +3052,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isHeartbeating() {
readLock.lock();
try {
- return heartbeatGeneratorFuture != null && !heartbeatGeneratorFuture.isCancelled()
- && heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
+ return heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
} finally {
readLock.unlock();
}
@@ -3112,6 +3147,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
boolean isChanging = false;
if (this.clustered != clustered) {
isChanging = true;
+
+ if (clustered) {
+ LOG.info("Cluster State changed from Not Clustered to Clustered");
+ } else {
+ LOG.info("Cluster State changed from Clustered to Not Clustered");
+ }
}
// mark the new cluster status
@@ -3185,6 +3226,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
zooKeeperStateServer.shutdown();
}
stateManagerProvider.disableClusterProvider();
+
+ setPrimary(false);
}
final List<RemoteProcessGroup> remoteGroups = getGroup(getRootGroupId()).findAllRemoteProcessGroups();
@@ -3194,7 +3237,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
} finally {
writeLock.unlock();
}
@@ -3230,12 +3273,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, primary, connected));
+ final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
// Emit a bulletin detailing the fact that the primary node state has changed
- final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
- final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
- bulletinRepository.addBulletin(bulletin);
+ if (oldBean == null || oldBean.isPrimary() != primary) {
+ final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
+ final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
+ bulletinRepository.addBulletin(bulletin);
+ LOG.info(message);
+ }
}
static boolean areEqual(final String a, final String b) {
@@ -3602,19 +3648,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isConnected() {
rwLock.readLock().lock();
try {
- return connected;
+ return connectionStatus.getState() == NodeConnectionState.CONNECTED;
} finally {
rwLock.readLock().unlock();
}
}
- public void setConnected(final boolean connected) {
+ public void setConnectionStatus(final NodeConnectionStatus connectionStatus) {
rwLock.writeLock().lock();
try {
- this.connected = connected;
+ this.connectionStatus = connectionStatus;
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connected));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
} finally {
rwLock.writeLock().unlock();
}
@@ -3628,25 +3674,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return;
}
- final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
+ final HeartbeatSendTask task = heartbeatSendTask.get();
if (task != null) {
- task.run();
+ clusterTaskExecutor.submit(task);
}
}
private class HeartbeatSendTask implements Runnable {
-
- private final NodeProtocolSender protocolSender;
private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
- public HeartbeatSendTask(final NodeProtocolSender protocolSender) {
- if (protocolSender == null) {
- throw new IllegalArgumentException("NodeProtocolSender may not be null.");
- }
- this.protocolSender = protocolSender;
- }
-
@Override
public void run() {
try {
@@ -3654,19 +3691,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return;
}
- final HeartbeatMessageGeneratorTask task = heartbeatMessageGeneratorTaskRef.get();
- if (task == null) {
- return;
- }
-
- final HeartbeatMessage message = task.getHeartbeatMessage();
+ final HeartbeatMessage message = createHeartbeatMessage();
if (message == null) {
heartbeatLogger.debug("No heartbeat to send");
return;
}
final long sendStart = System.nanoTime();
- protocolSender.heartbeat(message);
+ heartbeater.send(message);
+
final long sendNanos = System.nanoTime() - sendStart;
final long sendMillis = TimeUnit.NANOSECONDS.toMillis(sendNanos);
@@ -3679,58 +3712,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
heartbeatLogger.debug(usae.getMessage());
}
} catch (final Throwable ex) {
- heartbeatLogger.warn("Failed to send heartbeat to cluster manager due to: " + ex);
- if (heartbeatLogger.isDebugEnabled()) {
- heartbeatLogger.warn("", ex);
- }
+ heartbeatLogger.warn("Failed to send heartbeat due to: " + ex, ex);
}
}
}
- private class HeartbeatMessageGeneratorTask implements Runnable {
-
- private final AtomicReference<HeartbeatMessage> heartbeatMessageRef = new AtomicReference<>();
-
- @Override
- public void run() {
- final HeartbeatMessage heartbeatMessage = createHeartbeatMessage();
- if (heartbeatMessage != null) {
- heartbeatMessageRef.set(heartbeatMessage);
- }
- }
-
- public HeartbeatMessage getHeartbeatMessage() {
- return heartbeatMessageRef.getAndSet(null);
- }
-
- private HeartbeatMessage createHeartbeatMessage() {
- try {
- final HeartbeatBean bean = heartbeatBeanRef.get();
- if (bean == null) {
- return null;
+ HeartbeatMessage createHeartbeatMessage() {
+ try {
+ HeartbeatBean bean = heartbeatBeanRef.get();
+ if (bean == null) {
+ readLock.lock();
+ try {
+ final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED);
+ bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
+ } finally {
+ readLock.unlock();
}
+ }
- // create heartbeat payload
- final HeartbeatPayload hbPayload = new HeartbeatPayload();
- hbPayload.setSystemStartTime(systemStartTime);
- hbPayload.setActiveThreadCount(getActiveThreadCount());
+ // create heartbeat payload
+ final HeartbeatPayload hbPayload = new HeartbeatPayload();
+ hbPayload.setSystemStartTime(systemStartTime);
+ hbPayload.setActiveThreadCount(getActiveThreadCount());
- final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
- hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
- hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
+ final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
+ hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
+ hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
- // create heartbeat message
- final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal());
- final HeartbeatMessage message = new HeartbeatMessage();
- message.setHeartbeat(heartbeat);
+ // create heartbeat message
+ final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.getConnectionStatus(), hbPayload.marshal());
+ final HeartbeatMessage message = new HeartbeatMessage();
+ message.setHeartbeat(heartbeat);
- heartbeatLogger.debug("Generated heartbeat");
+ heartbeatLogger.debug("Generated heartbeat");
- return message;
- } catch (final Throwable ex) {
- LOG.warn("Failed to create heartbeat due to: " + ex, ex);
- return null;
- }
+ return message;
+ } catch (final Throwable ex) {
+ LOG.warn("Failed to create heartbeat due to: " + ex, ex);
+ return null;
}
}
@@ -3852,16 +3871,51 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return queues;
}
+ public void setNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus nodeStatus, final Long updateId) {
+ // We keep a VersionedNodeConnectionStatus as the value in our map, rather than NodeConnectionStatus.
+ // We do this because we update this based on data that is coming from the network. It is possible that we will
+ // get these notifications out-of-order because they could be sent across different TCP connections. As a result,
+ // we need to ensure that we don't update the status to an older version. The VersionedNodeConnectionStatus
+ // allows us to do this by looking at an "Update ID" that is associated with the new node status.
+ final VersionedNodeConnectionStatus versionedStatus = new VersionedNodeConnectionStatus(nodeStatus, updateId);
+
+ boolean updated = false;
+ while (!updated) {
+ VersionedNodeConnectionStatus curStatus = nodeStatuses.putIfAbsent(nodeId, versionedStatus);
+ if (curStatus == null) {
+ // There was no status before.
+ LOG.info("Status of Node {} set to {}", nodeId, nodeStatus);
+ return;
+ }
+
+ if (updateId < curStatus.getUpdateId()) {
+ LOG.debug("Received notification that status of Node {} changed to {} but the status update was old. Ignoring update.", nodeId, nodeStatus);
+ return;
+ }
+
+ updated = nodeStatuses.replace(nodeId, curStatus, versionedStatus);
+ if (updated) {
+ LOG.info("Status of {} changed from {} to {}", nodeId, curStatus.getStatus(), nodeStatus);
+ return;
+ }
+ }
+ }
+
+ public NodeConnectionStatus getNodeStatus(final NodeIdentifier nodeId) {
+ final VersionedNodeConnectionStatus versionedStatus = nodeStatuses.get(nodeId);
+ return versionedStatus == null ? null : versionedStatus.getStatus();
+ }
+
private static class HeartbeatBean {
private final ProcessGroup rootGroup;
private final boolean primary;
- private final boolean connected;
+ private final NodeConnectionStatus connectionStatus;
- public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final boolean connected) {
+ public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
this.rootGroup = rootGroup;
this.primary = primary;
- this.connected = connected;
+ this.connectionStatus = connectionStatus;
}
public ProcessGroup getRootGroup() {
@@ -3872,9 +3926,56 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return primary;
}
- public boolean isConnected() {
- return connected;
+ public NodeConnectionStatus getConnectionStatus() {
+ return connectionStatus;
}
}
+
+ /**
+ * A simple wrapper around a Node Connection Status and an Update ID. This is used as a value in a map so that we
+ * ensure that we update that Map only with newer versions
+ */
+ private static class VersionedNodeConnectionStatus {
+ private final NodeConnectionStatus status;
+ private final Long updateId;
+
+ public VersionedNodeConnectionStatus(final NodeConnectionStatus status, final Long updateId) {
+ this.status = status;
+ this.updateId = updateId;
+ }
+
+ public NodeConnectionStatus getStatus() {
+ return status;
+ }
+
+ public Long getUpdateId() {
+ return updateId;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((status == null) ? 0 : status.hashCode());
+ result = prime * result + ((updateId == null) ? 0 : updateId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ VersionedNodeConnectionStatus other = (VersionedNodeConnectionStatus) obj;
+ return other.getStatus().equals(getStatus()) && other.getUpdateId().equals(getUpdateId());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
index 00c6c5d..c648664 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.cluster.ConnectionException;
+
/**
* Represents the exceptional case when a controller managing an existing flow fails to fully load a different flow.
*
*/
-public class FlowSynchronizationException extends RuntimeException {
+public class FlowSynchronizationException extends ConnectionException {
private static final long serialVersionUID = 109234802938L;
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 67d0338..45999a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -41,26 +41,31 @@ import java.util.zip.GZIPOutputStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.ConnectionException;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
-import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
@@ -341,6 +346,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
case RECONNECTION_REQUEST:
case DISCONNECTION_REQUEST:
case FLOW_REQUEST:
+ case NODE_STATUS_CHANGE:
return true;
default:
return false;
@@ -354,6 +360,10 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
switch (request.getType()) {
case FLOW_REQUEST:
return handleFlowRequest((FlowRequestMessage) request);
+ case NODE_STATUS_CHANGE:
+ final NodeStatusChangeMessage statusChangeMsg = (NodeStatusChangeMessage) request;
+ controller.setNodeStatus(statusChangeMsg.getNodeId(), statusChangeMsg.getNodeConnectionStatus(), statusChangeMsg.getStatusUpdateIdentifier());
+ return null;
case RECONNECTION_REQUEST:
// Suspend heartbeats until we've reconnected. Otherwise,
// we may send a heartbeat while we are still in the process of
@@ -419,7 +429,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
// set node as clustered, since it is trying to connect to a cluster
controller.setClustered(true, null);
controller.setClusterManagerRemoteSiteInfo(null, null);
- controller.setConnected(false);
+ controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.NOT_YET_CONNECTED));
/*
* Start heartbeating. Heartbeats will fail because we can't reach
@@ -444,23 +454,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
loadFromConnectionResponse(response);
} catch (final ConnectionException ce) {
logger.error("Failed to load flow from cluster due to: " + ce, ce);
-
- /*
- * If we failed processing the response, then we want to notify
- * the manager so that it can mark the node as disconnected.
- */
- // create error message
- final ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
- msg.setExceptionMessage(ce.getMessage());
- msg.setNodeId(response.getNodeIdentifier());
-
- // send error message to manager
- try {
- senderListener.notifyControllerStartupFailure(msg);
- } catch (final ProtocolException | UnknownServiceAddressException e) {
- logger.warn("Failed to notify cluster manager of controller startup failure due to: " + e, e);
- }
-
+ handleConnectionFailure(ce);
throw new IOException(ce);
}
}
@@ -474,6 +468,33 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
}
}
+ private void handleConnectionFailure(final Exception ex) {
+ final Heartbeater heartbeater = controller.getHeartbeater();
+ if (heartbeater != null) {
+ final HeartbeatMessage startupFailureMessage = new HeartbeatMessage();
+ final NodeConnectionStatus connectionStatus;
+ if (ex instanceof UninheritableFlowException) {
+ connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+ } else if (ex instanceof FlowSynchronizationException) {
+ connectionStatus = new NodeConnectionStatus(DisconnectionCode.MISMATCHED_FLOWS, ex.toString());
+ } else {
+ connectionStatus = new NodeConnectionStatus(DisconnectionCode.STARTUP_FAILURE, ex.toString());
+ }
+
+ final byte[] payload;
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ HeartbeatPayload.marshal(new HeartbeatPayload(), baos);
+ payload = baos.toByteArray();
+
+ final Heartbeat failureHeartbeat = new Heartbeat(nodeId, false, connectionStatus, payload);
+ startupFailureMessage.setHeartbeat(failureHeartbeat);
+ heartbeater.send(startupFailureMessage);
+ } catch (final Exception e) {
+ logger.error("Failed to notify Cluster Coordinator that Connection failed", e);
+ }
+ }
+ }
+
private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
readLock.lock();
try {
@@ -509,7 +530,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.info("Processing reconnection request from manager.");
// reconnect
- final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(),
+ final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(),
request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId());
connectionResponse.setClusterManagerDN(request.getRequestorDN());
loadFromConnectionResponse(connectionResponse);
@@ -520,21 +541,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
} catch (final Exception ex) {
// disconnect controller
if (controller.isClustered()) {
- disconnect();
+ disconnect("Failed to properly handle Reconnection request due to " + ex.toString());
}
logger.error("Handling reconnection request failed due to: " + ex, ex);
-
- final ReconnectionFailureMessage failureMessage = new ReconnectionFailureMessage();
- failureMessage.setNodeId(request.getNodeId());
- failureMessage.setExceptionMessage(ex.toString());
-
- // send error message to manager
- try {
- senderListener.notifyReconnectionFailure(failureMessage);
- } catch (final ProtocolException | UnknownServiceAddressException e) {
- logger.warn("Failed to notify cluster manager of controller reconnection failure due to: " + e, e);
- }
+ handleConnectionFailure(ex);
} finally {
writeLock.unlock();
}
@@ -544,20 +555,20 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
writeLock.lock();
try {
logger.info("Received disconnection request message from manager with explanation: " + request.getExplanation());
- disconnect();
+ disconnect(request.getExplanation());
} finally {
writeLock.unlock();
}
}
- private void disconnect() {
+ private void disconnect(final String explanation) {
writeLock.lock();
try {
logger.info("Disconnecting node.");
// mark node as not connected
- controller.setConnected(false);
+ controller.setConnectionStatus(new NodeConnectionStatus(DisconnectionCode.UNKNOWN, explanation));
// turn off primary flag
controller.setPrimary(false);
@@ -727,7 +738,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
controller.setClustered(true, response.getInstanceId(), response.getClusterManagerDN());
controller.setClusterManagerRemoteSiteInfo(response.getManagerRemoteInputPort(), response.isManagerRemoteCommsSecure());
- controller.setConnected(true);
+ controller.setConnectionStatus(new NodeConnectionStatus(NodeConnectionState.CONNECTED));
// start the processors as indicated by the dataflow
controller.onFlowInitialized(dataFlow.isAutoStartProcessors());
@@ -736,11 +747,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
loadSnippets(dataFlow.getSnippets());
controller.startHeartbeating();
} catch (final UninheritableFlowException ufe) {
- throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
+ throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow is different than cluster flow.", ufe);
} catch (final FlowSerializationException fse) {
throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local or cluster flow is malformed.", fse);
} catch (final FlowSynchronizationException fse) {
- throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated. Administrator should disconnect node and review flow for corruption.", fse);
+ throw new FlowSynchronizationException(CONNECTION_EXCEPTION_MSG_PREFIX + "local flow controller partially updated. "
+ + "Administrator should disconnect node and review flow for corruption.", fse);
} catch (final Exception ex) {
throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex);
} finally {
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
index 1b4f9d9..d7fd6d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/UninheritableFlowException.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.cluster.ConnectionException;
+
/**
* Represents the exceptional case when a controller is to be loaded with a flow that is fundamentally different than its existing flow.
*
*/
-public class UninheritableFlowException extends RuntimeException {
+public class UninheritableFlowException extends ConnectionException {
private static final long serialVersionUID = 198234798234794L;
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
new file mode 100644
index 0000000..e5379d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/Heartbeater.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+/**
+ * <p>
+ * A mechanism for sending a heartbeat to a remote resource to indicate
+ * that the node is still an active participant in the cluster
+ * <p>
+ */
+public interface Heartbeater extends Closeable {
+
+ /**
+ * Sends the given heartbeat to the remote resource
+ *
+ * @param heartbeat the Heartbeat to send
+ * @throws IOException if unable to communicate with the remote resource
+ */
+ void send(HeartbeatMessage heartbeat) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
new file mode 100644
index 0000000..8571f51
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperClientConfig.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperClientConfig {
+ private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class);
+
+ private final String connectString;
+ private final int sessionTimeoutMillis;
+ private final int connectionTimeoutMillis;
+ private final String rootPath;
+ private final List<ACL> acls;
+
+ private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath, List<ACL> acls) {
+ this.connectString = connectString;
+ this.sessionTimeoutMillis = sessionTimeoutMillis;
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
+ this.rootPath = rootPath.endsWith("/") ? rootPath.substring(0, rootPath.length() - 1) : rootPath;
+ this.acls = acls;
+ }
+
+ public String getConnectString() {
+ return connectString;
+ }
+
+ public int getSessionTimeoutMillis() {
+ return sessionTimeoutMillis;
+ }
+
+ public int getConnectionTimeoutMillis() {
+ return connectionTimeoutMillis;
+ }
+
+ public String getRootPath() {
+ return rootPath;
+ }
+
+ public List<ACL> getACLs() {
+ return acls;
+ }
+
+ public String resolvePath(final String path) {
+ if (path.startsWith("/")) {
+ return rootPath + path;
+ }
+
+ return rootPath + "/" + path;
+ }
+
+ public static ZooKeeperClientConfig createConfig(final Properties properties) {
+ final String connectString = properties.getProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING);
+ if (connectString == null || connectString.trim().isEmpty()) {
+ throw new IllegalStateException("The '" + NiFiProperties.ZOOKEEPER_CONNECT_STRING + "' property is not set in nifi.properties");
+ }
+
+ final long sessionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+ final long connectionTimeoutMs = getTimePeriod(properties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
+ final String rootPath = properties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
+ final String accessControl = properties.getProperty(NiFiProperties.ZOOKEEPER_ACCESS_CONTROL);
+
+ final List<ACL> acls;
+ if (accessControl == null || accessControl.trim().isEmpty()) {
+ acls = null;
+ } else if (accessControl.equalsIgnoreCase("Open")) {
+ acls = Ids.OPEN_ACL_UNSAFE;
+ } else if (accessControl.equalsIgnoreCase("CreatorOnly")) {
+ acls = Ids.CREATOR_ALL_ACL;
+ } else {
+ acls = null;
+ }
+
+ try {
+ PathUtils.validatePath(rootPath);
+ } catch (final IllegalArgumentException e) {
+ throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
+ }
+
+ return new ZooKeeperClientConfig(connectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath, acls);
+ }
+
+ private static int getTimePeriod(final Properties properties, final String propertyName, final String defaultValue) {
+ final String timeout = properties.getProperty(propertyName, defaultValue);
+ try {
+ return (int) FormatUtils.getTimeDuration(timeout, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.warn("Value of '" + propertyName + "' property is set to '" + timeout + "', which is not a valid time period. Using default of " + defaultValue);
+ return (int) FormatUtils.getTimeDuration(defaultValue, TimeUnit.MILLISECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
new file mode 100644
index 0000000..4348cec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ZooKeeperHeartbeater.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.cluster;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.protocol.NodeProtocolSender;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses ZooKeeper in order to determine which node is the elected Cluster Coordinator and to indicate
+ * that this node is part of the cluster. However, once the Cluster Coordinator is known, heartbeats are
+ * sent directly to the Cluster Coordinator.
+ */
+public class ZooKeeperHeartbeater implements Heartbeater {
+ private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHeartbeater.class);
+
+ private final NodeProtocolSender protocolSender;
+ private final CuratorFramework curatorClient;
+ private final String nodesPathPrefix;
+
+ private final String coordinatorPath;
+ private volatile String coordinatorAddress;
+
+
+ public ZooKeeperHeartbeater(final NodeProtocolSender protocolSender, final Properties properties) {
+ this.protocolSender = protocolSender;
+
+ final RetryPolicy retryPolicy = new RetryForever(5000);
+ final ZooKeeperClientConfig zkConfig = ZooKeeperClientConfig.createConfig(properties);
+
+ curatorClient = CuratorFrameworkFactory.newClient(zkConfig.getConnectString(),
+ zkConfig.getSessionTimeoutMillis(), zkConfig.getConnectionTimeoutMillis(), retryPolicy);
+
+ curatorClient.start();
+ nodesPathPrefix = zkConfig.resolvePath("cluster/nodes");
+ coordinatorPath = nodesPathPrefix + "/coordinator";
+ }
+
+ private String getHeartbeatAddress() throws IOException {
+ final String curAddress = coordinatorAddress;
+ if (curAddress != null) {
+ return curAddress;
+ }
+
+ try {
+ // Get coordinator address and add watcher to change who we are heartbeating to if the value changes.
+ final byte[] coordinatorAddressBytes = curatorClient.getData().usingWatcher(new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ coordinatorAddress = null;
+ }
+ }).forPath(coordinatorPath);
+ final String address = coordinatorAddress = new String(coordinatorAddressBytes, StandardCharsets.UTF_8);
+
+ logger.info("Determined that Cluster Coordinator is located at {}; will use this address for sending heartbeat messages", address);
+ return address;
+ } catch (Exception e) {
+ throw new IOException("Unable to determine Cluster Coordinator from ZooKeeper", e);
+ }
+ }
+
+ @Override
+ public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
+ final String heartbeatAddress = getHeartbeatAddress();
+
+ try {
+ protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
+ } catch (final ProtocolException pe) {
+ // a ProtocolException is likely the result of not being able to communicate
+ // with the coordinator. If we do get an IOException communicating with the coordinator,
+ // it will be the cause of the Protocol Exception. In this case, set coordinatorAddress
+ // to null so that we double-check next time that the coordinator has not changed.
+ if (pe.getCause() instanceof IOException) {
+ coordinatorAddress = null;
+ }
+
+ throw pe;
+ }
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (curatorClient != null) {
+ curatorClient.close();
+ }
+
+ logger.info("ZooKeeper heartbeater closed. Will no longer send Heartbeat messages to ZooKeeper");
+ }
+}