You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/22 21:01:49 UTC
[3/4] nifi git commit: NIFI-1678: Started refactoring heartbeating
mechanism, using a new package: org.apache.nifi.cluster.coordination
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
new file mode 100644
index 0000000..d6838cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
+
+ private final int heartbeatIntervalMillis;
+ private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
+ protected final ClusterCoordinator clusterCoordinator;
+ protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
+
+ protected volatile long latestHeartbeatTime;
+ private volatile ScheduledFuture<?> future;
+ private volatile boolean stopped = true;
+
+
+ public AbstractHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+ this.clusterCoordinator = clusterCoordinator;
+ final String heartbeatInterval = properties.getProperty(NiFiProperties.CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL,
+ NiFiProperties.DEFAULT_CLUSTER_PROTOCOL_HEARTBEAT_INTERVAL);
+ this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(heartbeatInterval, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public final void start() {
+ stopped = false;
+ onStart();
+
+ this.future = flowEngine.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ monitorHeartbeats();
+ } catch (final Exception e) {
+ clusterCoordinator.reportEvent(null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e.toString());
+ logger.error("Failed to process heartbeats", e);
+ }
+ }
+ }, heartbeatIntervalMillis, heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public final void stop() {
+ this.stopped = true;
+
+ try {
+ if (future != null) {
+ future.cancel(true);
+ }
+ } finally {
+ onStop();
+ }
+ }
+
+ protected boolean isStopped() {
+ return stopped;
+ }
+
+ @Override
+ public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+ return getLatestHeartbeats().get(nodeId);
+ }
+
+ protected ClusterCoordinator getClusterCoordinator() {
+ return clusterCoordinator;
+ }
+
+ protected long getHeartbeatInterval(final TimeUnit timeUnit) {
+ return timeUnit.convert(heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Fetches all of the latest heartbeats and updates the Cluster Coordinator as appropriate,
+ * based on the heartbeats received.
+ *
+ * Visible for testing.
+ */
+ protected synchronized void monitorHeartbeats() {
+ final Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
+ if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
+ // failed to fetch heartbeats; don't change anything.
+ clusterCoordinator.reportEvent(null, Severity.WARNING, "Failed to retrieve any new heartbeat information for nodes. "
+ + "Will not make any decisions based on heartbeats.");
+ return;
+ }
+
+ final StopWatch procStopWatch = new StopWatch(true);
+ for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+ try {
+ processHeartbeat(heartbeat);
+ } catch (final Exception e) {
+ clusterCoordinator.reportEvent(null, Severity.ERROR,
+ "Received heartbeat from " + heartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e);
+ logger.error("Failed to process heartbeat from {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+ logger.error("", e);
+ }
+ }
+
+ procStopWatch.stop();
+ logger.info("Finished processing {} heartbeats in {}", latestHeartbeats.size(), procStopWatch.getDuration());
+
+ // Disconnect any node that hasn't sent a heartbeat in a long time (8 times the heartbeat interval)
+ final long maxMillis = heartbeatIntervalMillis * 1000L * 8;
+ final long threshold = latestHeartbeatTime - maxMillis;
+ for (final NodeHeartbeat heartbeat : latestHeartbeats.values()) {
+ if (heartbeat.getTimestamp() < threshold) {
+ clusterCoordinator.requestNodeDisconnect(heartbeat.getNodeIdentifier(), DisconnectionCode.LACK_OF_HEARTBEAT,
+ "Latest heartbeat from Node has expired");
+
+ try {
+ removeHeartbeat(heartbeat.getNodeIdentifier());
+ } catch (final Exception e) {
+ logger.warn("Failed to remove heartbeat for {} due to {}", heartbeat.getNodeIdentifier(), e.toString());
+ logger.warn("", e);
+ }
+ }
+ }
+ }
+
+ private void processHeartbeat(final NodeHeartbeat heartbeat) {
+ final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+
+ // Do not process heartbeat if it's blocked by firewall.
+ if (clusterCoordinator.isBlockedByFirewall(nodeId.getSocketAddress())) {
+ clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
+
+ // request node to disconnect
+ clusterCoordinator.requestNodeDisconnect(nodeId, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
+ removeHeartbeat(nodeId);
+ return;
+ }
+
+ final NodeConnectionStatus connectionStatus = clusterCoordinator.getConnectionStatus(nodeId);
+ if (connectionStatus == null) {
+ final NodeConnectionState hbConnectionState = heartbeat.getConnectionStatus().getState();
+ if (hbConnectionState == NodeConnectionState.DISCONNECTED || hbConnectionState == NodeConnectionState.DISCONNECTING) {
+ // Node is not part of the cluster. Remove heartbeat and move on.
+ removeHeartbeat(nodeId);
+ return;
+ }
+
+ // Unknown node. Issue reconnect request
+ clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from unknown node. Removing heartbeat and requesting that node connect to cluster.");
+ removeHeartbeat(nodeId);
+
+ clusterCoordinator.requestNodeConnect(nodeId);
+ return;
+ }
+
+ final DisconnectionCode reportedDisconnectCode = heartbeat.getConnectionStatus().getDisconnectCode();
+ if (reportedDisconnectCode != null) {
+ // Check if the node is notifying us that it wants to disconnect from the cluster
+ final boolean requestingDisconnect;
+ switch (reportedDisconnectCode) {
+ case MISMATCHED_FLOWS:
+ case NODE_SHUTDOWN:
+ case STARTUP_FAILURE:
+ final NodeConnectionState expectedState = connectionStatus.getState();
+ requestingDisconnect = expectedState == NodeConnectionState.CONNECTED || expectedState == NodeConnectionState.CONNECTING;
+ break;
+ default:
+ requestingDisconnect = false;
+ break;
+ }
+
+ if (requestingDisconnect) {
+ clusterCoordinator.disconnectionRequestedByNode(nodeId, heartbeat.getConnectionStatus().getDisconnectCode(),
+ heartbeat.getConnectionStatus().getDisconnectReason());
+ removeHeartbeat(nodeId);
+ return;
+ }
+ }
+
+ final NodeConnectionState connectionState = connectionStatus.getState();
+ if (heartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && connectionState == NodeConnectionState.CONNECTED) {
+ // Cluster Coordinator believes that node is connected, but node does not believe so.
+ clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,"
+ + "though the Cluster Coordinator thought it was (node claimed state was " + heartbeat.getConnectionStatus().getState()
+ + "). Marking as Disconnected and requesting that Node reconnect to cluster");
+ clusterCoordinator.requestNodeConnect(nodeId);
+ return;
+ }
+
+ if (NodeConnectionState.DISCONNECTED == connectionState) {
+ // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
+ // the only node. We allow it if it is the only node because if we have a one-node cluster, then
+ // we cannot manually reconnect it.
+ final DisconnectionCode disconnectionCode = connectionStatus.getDisconnectCode();
+
+ if (disconnectionCode == DisconnectionCode.LACK_OF_HEARTBEAT) {
+ // record event
+ clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat from node previously "
+ + "disconnected due to lack of heartbeat. Issuing reconnection request.");
+
+ clusterCoordinator.requestNodeConnect(nodeId);
+ } else {
+ // disconnected nodes should not heartbeat, so we need to issue a disconnection request
+ logger.info("Ignoring received heartbeat from disconnected node " + nodeId + ". Issuing disconnection request.");
+ clusterCoordinator.requestNodeDisconnect(nodeId, connectionStatus.getDisconnectCode(), connectionStatus.getDisconnectReason());
+ removeHeartbeat(nodeId);
+ }
+
+ return;
+ }
+
+ if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
+ // ignore spurious heartbeat
+ removeHeartbeat(nodeId);
+ return;
+ }
+
+ // first heartbeat causes status change from connecting to connected
+ if (NodeConnectionState.CONNECTING == connectionState) {
+ final Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
+ if (connectionRequestTime != null && heartbeat.getTimestamp() < connectionRequestTime) {
+ clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
+ removeHeartbeat(nodeId);
+ return;
+ }
+
+ // connection complete
+ clusterCoordinator.finishNodeConnection(nodeId);
+ clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
+ }
+
+ if (heartbeat.isPrimary()) {
+ clusterCoordinator.setPrimaryNode(nodeId);
+ }
+ }
+
+
+ /**
+ * @return the most recent heartbeat information for each node in the cluster
+ */
+ protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();
+
+ /**
+ * This method does nothing in the abstract class but is meant for subclasses to
+ * override in order to provide functionality when the monitor is started.
+ */
+ protected void onStart() {
+ }
+
+ /**
+ * This method does nothing in the abstract class but is meant for subclasses to
+ * override in order to provide functionality when the monitor is stopped.
+ */
+ protected void onStop() {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
new file mode 100644
index 0000000..d9ef0be
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryForever;
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.ProtocolHandler;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
+import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses Apache Curator to monitor heartbeats from nodes
+ */
+public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
+ protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
+ private static final String COORDINATOR_ZNODE_NAME = "coordinator";
+
+ private final ZooKeeperClientConfig zkClientConfig;
+ private final String clusterNodesPath;
+
+ private volatile CuratorFramework curatorClient;
+ private volatile Map<String, NodeIdentifier> clusterNodeIds = new HashMap<>();
+
+ private final String heartbeatAddress;
+ private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages = new ConcurrentHashMap<>();
+
+ protected static final Unmarshaller nodeIdentifierUnmarshaller;
+
+ static {
+ try {
+ final JAXBContext jaxbContext = JAXBContext.newInstance(NodeIdentifier.class);
+ nodeIdentifierUnmarshaller = jaxbContext.createUnmarshaller();
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Node Identifier", e);
+ }
+ }
+
+
+ public ClusterProtocolHeartbeatMonitor(final ClusterCoordinator clusterCoordinator, final Properties properties) {
+ super(clusterCoordinator, properties);
+
+ this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties);
+ this.clusterNodesPath = zkClientConfig.resolvePath("cluster/nodes");
+
+ String hostname = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_ADDRESS);
+ if (hostname == null) {
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("Unable to determine local hostname and the '" + NiFiProperties.CLUSTER_MANAGER_ADDRESS + "' property is not set");
+ }
+ }
+
+ final String port = properties.getProperty(NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT);
+ if (port == null) {
+ throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+ + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is not set");
+ }
+
+ try {
+ Integer.parseInt(port);
+ } catch (final NumberFormatException nfe) {
+ throw new RuntimeException("Unable to determine which port Cluster Manager Protocol is listening on because the '"
+ + NiFiProperties.CLUSTER_MANAGER_PROTOCOL_PORT + "' property is set to '" + port + "', which is not a valid port number.");
+ }
+
+ heartbeatAddress = hostname + ":" + port;
+ }
+
+ @Override
+ public void onStart() {
+ final RetryPolicy retryPolicy = new RetryForever(5000);
+ curatorClient = CuratorFrameworkFactory.newClient(zkClientConfig.getConnectString(),
+ zkClientConfig.getSessionTimeoutMillis(), zkClientConfig.getConnectionTimeoutMillis(), retryPolicy);
+ curatorClient.start();
+
+ final Thread publishAddress = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!isStopped()) {
+ final String path = clusterNodesPath + "/" + COORDINATOR_ZNODE_NAME;
+ try {
+ try {
+ curatorClient.setData().forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+ return;
+ } catch (final NoNodeException nne) {
+ // ensure that parents are created, using a wide-open ACL because the parents contain no data
+ // and the path is not in any way sensitive.
+ try {
+ curatorClient.create().creatingParentContainersIfNeeded().forPath(path);
+ } catch (final NodeExistsException nee) {
+ // This is okay. Node already exists.
+ }
+
+ curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, heartbeatAddress.getBytes(StandardCharsets.UTF_8));
+ logger.info("Successfully created node in ZooKeeper with path {}", path);
+
+ return;
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry.");
+
+ try {
+ Thread.sleep(2000L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+ }
+ });
+
+ publishAddress.setName("Publish Heartbeat Address");
+ publishAddress.setDaemon(true);
+ publishAddress.start();
+ }
+
+ private CuratorFramework getClient() {
+ return curatorClient;
+ }
+
+ @Override
+ public void onStop() {
+ final CuratorFramework client = getClient();
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
+ return Collections.unmodifiableMap(heartbeatMessages);
+ }
+
+ @Override
+ public synchronized void removeHeartbeat(final NodeIdentifier nodeId) {
+ logger.debug("Deleting heartbeat for node {}", nodeId);
+ final String nodeInfoPath = clusterNodesPath + "/" + nodeId.getId();
+
+ heartbeatMessages.remove(nodeId);
+
+ try {
+ getClient().delete().forPath(nodeInfoPath);
+ logger.info("Removed heartbeat from ZooKeeper for Node {}", nodeId);
+ } catch (final NoNodeException e) {
+ // node did not exist. Just return.
+ logger.debug("Attempted to remove heartbeat for Node with ID {} but no ZNode existed at {}", nodeId, nodeInfoPath);
+ return;
+ } catch (final Exception e) {
+ logger.warn("Failed to remove heartbeat from ZooKeeper for Node {} due to {}", nodeId, e);
+ logger.warn("", e);
+
+ clusterCoordinator.reportEvent(nodeId, Severity.WARNING, "Failed to remove node's heartbeat from ZooKeeper due to " + e);
+ }
+ }
+
+ protected Set<NodeIdentifier> getClusterNodeIds() {
+ return new HashSet<>(clusterNodeIds.values());
+ }
+
+
+ @Override
+ public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
+ if (msg.getType() != MessageType.HEARTBEAT) {
+ throw new ProtocolException("Cannot handle message of type " + msg.getType());
+ }
+
+ final HeartbeatMessage heartbeatMsg = (HeartbeatMessage) msg;
+ final Heartbeat heartbeat = heartbeatMsg.getHeartbeat();
+
+ final NodeIdentifier nodeId = heartbeat.getNodeIdentifier();
+ final NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
+ final boolean primary = heartbeat.isPrimary();
+ final byte[] payloadBytes = heartbeat.getPayload();
+ final HeartbeatPayload payload = HeartbeatPayload.unmarshal(payloadBytes);
+ final int activeThreadCount = payload.getActiveThreadCount();
+ final int flowFileCount = (int) payload.getTotalFlowFileCount();
+ final long flowFileBytes = payload.getTotalFlowFileBytes();
+ final long systemStartTime = payload.getSystemStartTime();
+
+ final NodeHeartbeat nodeHeartbeat = new StandardNodeHeartbeat(nodeId, System.currentTimeMillis(),
+ connectionStatus, primary, flowFileCount, flowFileBytes, activeThreadCount, systemStartTime);
+ heartbeatMessages.put(heartbeat.getNodeIdentifier(), nodeHeartbeat);
+ logger.debug("Received new heartbeat from {}", nodeId);
+
+ return null;
+ }
+
+ @Override
+ public boolean canHandle(ProtocolMessage msg) {
+ return msg.getType() == MessageType.HEARTBEAT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
new file mode 100644
index 0000000..c151382
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/HeartbeatMonitor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+/**
+ * A HeartbeatMonitor is responsible for monitoring some remote resource for heartbeats from each
+ * node in a cluster and reacting to those heartbeats (or lack thereof).
+ */
+public interface HeartbeatMonitor {
+
+ /**
+ * Begin monitoring for heartbeats
+ */
+ void start();
+
+ /**
+ * Stop monitoring heartbeats
+ */
+ void stop();
+
+ /**
+ * Returns the latest heartbeat that has been obtained for the node with
+ * the given id
+ *
+ * @param nodeId the id of the node whose heartbeat should be retrieved
+ * @return the latest heartbeat that has been obtained for the node with
+ * the given id, or <code>null</code> if no heartbeat has been obtained
+ */
+ NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeId);
+
+ /**
+ * Removes the heartbeat for the given node from the monitor and the
+ * remote location where heartbeats are sent
+ *
+ * @param nodeId the id of the node whose heartbeat should be removed
+ */
+ void removeHeartbeat(NodeIdentifier nodeId);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
new file mode 100644
index 0000000..bd66022
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/NodeHeartbeat.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public interface NodeHeartbeat {
+ /**
+ * @return the time at which the node reported the heartbeat, according
+ * to the system that received the heartbeat
+ */
+ long getTimestamp();
+
+ /**
+ * @return the identifier of the node that sent the heartbeat
+ */
+ NodeIdentifier getNodeIdentifier();
+
+ /**
+ * @return the Connection Status reported by the node
+ */
+ NodeConnectionStatus getConnectionStatus();
+
+ /**
+ * @return <code>true</code> if the node is the Primary Node in the cluster, <code>false</code> otherwise
+ */
+ boolean isPrimary();
+
+ /**
+ * @return the number of FlowFiles that are queued up on the node
+ */
+ int getFlowFileCount();
+
+ /**
+ * @return the total size of all FlowFiles that are queued up on the node
+ */
+ long getFlowFileBytes();
+
+ /**
+ * @return the number of threads that are actively running in Processors and Reporting Tasks on the node
+ */
+ int getActiveThreadCount();
+
+ /**
+ * @return the time that the node reports having started NiFi
+ */
+ long getSystemStartTime();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
new file mode 100644
index 0000000..133bab0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/StandardNodeHeartbeat.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.heartbeat;
+
+import org.apache.nifi.cluster.HeartbeatPayload;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.Heartbeat;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
+
+public class StandardNodeHeartbeat implements NodeHeartbeat {
+
+ private final NodeIdentifier nodeId;
+ private final long timestamp;
+ private final NodeConnectionStatus connectionStatus;
+ private final boolean primary;
+ private final int flowFileCount;
+ private final long flowFileBytes;
+ private final int activeThreadCount;
+ private final long systemStartTime;
+
+ public StandardNodeHeartbeat(final NodeIdentifier nodeId, final long timestamp, final NodeConnectionStatus connectionStatus,
+ final boolean primary, final int flowFileCount, final long flowFileBytes, final int activeThreadCount, final long systemStartTime) {
+ this.timestamp = timestamp;
+ this.nodeId = nodeId;
+ this.connectionStatus = connectionStatus;
+ this.primary = primary;
+ this.flowFileCount = flowFileCount;
+ this.flowFileBytes = flowFileBytes;
+ this.activeThreadCount = activeThreadCount;
+ this.systemStartTime = systemStartTime;
+ }
+
+ @Override
+ public NodeIdentifier getNodeIdentifier() {
+ return nodeId;
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public NodeConnectionStatus getConnectionStatus() {
+ return connectionStatus;
+ }
+
+ @Override
+ public boolean isPrimary() {
+ return primary;
+ }
+
+ @Override
+ public int getFlowFileCount() {
+ return flowFileCount;
+ }
+
+ @Override
+ public long getFlowFileBytes() {
+ return flowFileBytes;
+ }
+
+ @Override
+ public int getActiveThreadCount() {
+ return activeThreadCount;
+ }
+
+
+ @Override
+ public long getSystemStartTime() {
+ return systemStartTime;
+ }
+
+ public static StandardNodeHeartbeat fromHeartbeatMessage(final HeartbeatMessage message, final long timestamp) {
+ final Heartbeat heartbeat = message.getHeartbeat();
+ final HeartbeatPayload payload = HeartbeatPayload.unmarshal(heartbeat.getPayload());
+
+ return new StandardNodeHeartbeat(heartbeat.getNodeIdentifier(), timestamp, heartbeat.getConnectionStatus(),
+ heartbeat.isPrimary(), (int) payload.getTotalFlowFileCount(), payload.getTotalFlowFileBytes(),
+ payload.getActiveThreadCount(), payload.getSystemStartTime());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
new file mode 100644
index 0000000..d04d144
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/ClusterNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+
+public class ClusterNode {
+ private final NodeIdentifier nodeId;
+ private NodeConnectionStatus connectionStatus = new NodeConnectionStatus(NodeConnectionState.DISCONNECTED, DisconnectionCode.NOT_YET_CONNECTED);
+
+
+ public ClusterNode(final NodeIdentifier nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public NodeIdentifier getIdentifier() {
+ return nodeId;
+ }
+
+ public NodeConnectionStatus getConnectionStatus() {
+ return connectionStatus;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 27ada88..de3c23e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -29,7 +29,6 @@ import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.reporting.BulletinRepository;
@@ -49,14 +48,6 @@ import org.apache.nifi.reporting.BulletinRepository;
public interface ClusterManager extends NodeInformant {
/**
- * Handles a node's heartbeat.
- *
- * @param heartbeat a heartbeat
- *
- */
- void handleHeartbeat(Heartbeat heartbeat);
-
- /**
* @param statuses the statuses of the nodes
* @return the set of nodes
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb7b3fe4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index aefd307..d4ea1d4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -33,16 +33,12 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.Queue;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@@ -71,9 +67,11 @@ import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
-import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
+import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
+import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.EventManager;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
@@ -104,7 +102,6 @@ import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
-import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
@@ -113,12 +110,9 @@ import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListene
import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
@@ -290,7 +284,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static final String BULLETIN_CATEGORY = "Clustering";
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
- private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
/**
* The HTTP header to store a cluster context. An example of what may be stored in the context is a node's auditable actions in response to a cluster request. The cluster context is serialized
@@ -385,10 +378,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
private final ClusterManagerProtocolSenderListener senderListener;
private final OptimisticLockingManager optimisticLockingManager;
private final StringEncryptor encryptor;
- private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock(true);
private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
+ private final ClusterProtocolHeartbeatMonitor heartbeatMonitor;
+ private final WebClusterManagerCoordinator clusterCoordinator;
private final Set<Node> nodes = new HashSet<>();
private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
@@ -396,8 +390,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// null means the dataflow should be read from disk
private StandardDataFlow cachedDataFlow = null;
private NodeIdentifier primaryNodeId = null;
- private Timer heartbeatMonitor;
- private Timer heartbeatProcessor;
private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
private volatile EventManager eventManager = null;
private volatile ClusterNodeFirewall clusterFirewall = null;
@@ -492,6 +484,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
+
+ clusterCoordinator = new WebClusterManagerCoordinator(this, senderListener);
+ heartbeatMonitor = new ClusterProtocolHeartbeatMonitor(clusterCoordinator, properties);
+ senderListener.addHandler(heartbeatMonitor);
}
public void start() throws IOException {
@@ -502,13 +498,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
try {
- // setup heartbeat monitoring
- heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true);
- heartbeatMonitor.schedule(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000);
-
- heartbeatProcessor = new Timer("Process Pending Heartbeats", true);
- final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2);
- heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay);
+ heartbeatMonitor.start();
// start request replication service
httpRequestReplicator.start();
@@ -572,16 +562,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
boolean encounteredException = false;
- // stop the heartbeat monitoring
- if (isHeartbeatMonitorRunning()) {
- heartbeatMonitor.cancel();
- heartbeatMonitor = null;
- }
-
- if (heartbeatProcessor != null) {
- heartbeatProcessor.cancel();
- heartbeatProcessor = null;
- }
+ heartbeatMonitor.stop();
// stop the HTTP request replicator service
if (httpRequestReplicator.isRunning()) {
@@ -628,8 +609,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public boolean isRunning() {
readLock.lock();
try {
- return isHeartbeatMonitorRunning()
- || httpRequestReplicator.isRunning()
+ return httpRequestReplicator.isRunning()
|| senderListener.isRunning()
|| dataFlowManagementService.isRunning()
|| isBroadcasting();
@@ -640,10 +620,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public boolean canHandle(ProtocolMessage msg) {
- return MessageType.CONNECTION_REQUEST == msg.getType()
- || MessageType.HEARTBEAT == msg.getType()
- || MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
- || MessageType.RECONNECTION_FAILURE == msg.getType();
+ return MessageType.CONNECTION_REQUEST == msg.getType();
}
@Override
@@ -651,31 +628,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
switch (protocolMessage.getType()) {
case CONNECTION_REQUEST:
return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
- case HEARTBEAT:
- final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage;
-
- final Heartbeat original = heartbeatMessage.getHeartbeat();
- final NodeIdentifier originalNodeId = original.getNodeIdentifier();
- final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload());
-
- handleHeartbeat(heartbeatWithDn);
- return null;
- case CONTROLLER_STARTUP_FAILURE:
- new Thread(new Runnable() {
- @Override
- public void run() {
- handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage);
- }
- }, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
- return null;
- case RECONNECTION_FAILURE:
- new Thread(new Runnable() {
- @Override
- public void run() {
- handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
- }
- }, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
- return null;
default:
throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
}
@@ -758,16 +710,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
addEvent(node.getNodeId(), "Connection requested from new node. Setting status to connecting.");
nodes.add(node);
} else {
- node.setStatus(Status.CONNECTING);
+ clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
addEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting");
}
// record the time of the connection request
node.setConnectionRequestedTimestamp(new Date().getTime());
- // clear out old heartbeat info
- node.setHeartbeat(null);
-
// try to obtain a current flow
if (dataFlowManagementService.isFlowCurrent()) {
// if a cached copy does not exist, load it from disk
@@ -777,17 +726,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
}
- // determine if this node should be assigned the primary role
- final boolean primaryRole;
- if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
- setPrimaryNodeId(node.getNodeId());
- addEvent(node.getNodeId(), "Setting primary role in connection response.");
- primaryRole = true;
- } else {
- primaryRole = false;
- }
-
- return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
+ return new ConnectionResponse(node.getNodeId(), cachedDataFlow, remoteInputPort, remoteCommsSecure, instanceId);
}
/*
@@ -848,9 +787,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect.");
}
- // clear out old heartbeat info
- node.setHeartbeat(null);
-
// get the dataflow to send with the reconnection request
if (!dataFlowManagementService.isFlowCurrent()) {
/* node remains disconnected */
@@ -867,7 +803,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
}
- node.setStatus(Status.CONNECTING);
+ clusterCoordinator.updateNodeStatus(node, Status.CONNECTING);
addEvent(node.getNodeId(), "Reconnection requested for node. Setting status to connecting.");
// determine if this node should be assigned the primary role
@@ -889,7 +825,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
} catch (final Exception ex) {
logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex);
- node.setStatus(Status.DISCONNECTED);
+ clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex;
addEvent(node.getNodeId(), eventMsg);
addBulletin(node, Severity.WARNING, eventMsg);
@@ -1214,7 +1150,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
}
- requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node");
+
+ clusterCoordinator.requestNodeDisconnect(node.getNodeId(), DisconnectionCode.USER_DISCONNECTED, "User " + userDn + " Disconnected Node");
} finally {
writeLock.unlock("requestDisconnection(String)");
}
@@ -1246,7 +1183,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
* true.
* @throws NodeDisconnectionException if the disconnection message fails to be sent.
*/
- private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
+ void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
throws IllegalNodeDisconnectionException, NodeDisconnectionException {
writeLock.lock();
@@ -1277,14 +1214,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// cannot disconnect the last connected node in the cluster
if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) {
throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster.");
- } else if (isPrimaryNode(nodeId)) {
- // cannot disconnect the primary node in the cluster
- throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster.");
}
}
// update status
- node.setStatus(Status.DISCONNECTED);
+ clusterCoordinator.updateNodeStatus(node, Status.DISCONNECTED);
notifyDataFlowManagementServiceOfNodeStatusChange();
// issue the disconnection
@@ -1296,6 +1230,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
senderListener.disconnect(request);
addEvent(nodeId, "Node disconnected due to " + explanation);
addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation);
+
+ heartbeatMonitor.removeHeartbeat(nodeId);
} finally {
writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
}
@@ -1318,36 +1254,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return responseMessage;
}
- private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) {
- writeLock.lock();
- try {
- final Node node = getRawNode(msg.getNodeId().getId());
- if (node != null) {
- node.setStatus(Status.DISCONNECTED);
- addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported "
- + "the following error: " + msg.getExceptionMessage());
- addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node "
- + "reported the following error: " + msg.getExceptionMessage());
- }
- } finally {
- writeLock.unlock("handleControllerStartupFailure");
- }
- }
-
- private void handleReconnectionFailure(final ReconnectionFailureMessage msg) {
- writeLock.lock();
- try {
- final Node node = getRawNode(msg.getNodeId().getId());
- if (node != null) {
- node.setStatus(Status.DISCONNECTED);
- final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage();
- addEvent(msg.getNodeId(), errorMsg);
- addBulletin(node, Severity.ERROR, errorMsg);
- }
- } finally {
- writeLock.unlock("handleControllerStartupFailure");
- }
- }
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
@@ -1623,176 +1529,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
- /**
- * Handles a node's heartbeat. If this heartbeat is a node's first heartbeat since its connection request, then the manager will mark the node as connected. If the node was previously disconnected
- * due to a lack of heartbeat, then a reconnection request is issued. If the node was disconnected for other reasons, then a disconnection request is issued. If this instance is configured with a
- * firewall and the heartbeat is blocked, then a disconnection request is issued.
- */
- @Override
- public void handleHeartbeat(final Heartbeat heartbeat) {
- // sanity check heartbeat
- if (heartbeat == null) {
- throw new IllegalArgumentException("Heartbeat may not be null.");
- } else if (heartbeat.getNodeIdentifier() == null) {
- throw new IllegalArgumentException("Heartbeat does not contain a node ID.");
- }
-
- /*
- * Processing a heartbeat requires a write lock, which may take a while
- * to obtain. Only the last heartbeat is necessary to process per node.
- * Futhermore, since many could pile up, heartbeats are processed in
- * bulk.
- * The below queue stores the pending heartbeats.
- */
- pendingHeartbeats.add(heartbeat);
- }
-
- private void processPendingHeartbeats() {
- Node node;
-
- writeLock.lock();
- try {
- /*
- * Get the most recent heartbeats for the nodes in the cluster. This
- * is achieved by "draining" the pending heartbeats queue, populating
- * a map that associates a node identifier with its latest heartbeat, and
- * finally, getting the values of the map.
- */
- final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>();
- Heartbeat aHeartbeat;
- while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
- mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat);
- }
- final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values());
-
- // return fast if no work to do
- if (mostRecentHeartbeats.isEmpty()) {
- return;
- }
-
- logNodes("Before Heartbeat Processing", heartbeatLogger);
-
- final int numPendingHeartbeats = mostRecentHeartbeats.size();
- if (heartbeatLogger.isDebugEnabled()) {
- heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
- }
-
- for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
- try {
- // resolve the proposed node identifier to valid node identifier
- final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
-
- // get a raw reference to the node (if it doesn't exist, node will be null)
- node = getRawNode(resolvedNodeIdentifier.getId());
-
- final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
-
- if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
- if (node == null) {
- logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ". Issuing disconnection request.");
- } else {
- // record event
- addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat. Issuing disconnection request.");
- }
-
- // request node to disconnect
- requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall");
-
- } else if (node == null) { // unknown node, so issue reconnect request
- // create new node and add to node set
- final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED);
- nodes.add(newNode);
-
- // record event
- addEvent(newNode.getNodeId(), "Received heartbeat from unknown node. Issuing reconnection request.");
-
- // record heartbeat
- newNode.setHeartbeat(mostRecentHeartbeat);
- requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
- } else if (heartbeatIndicatesNotYetConnected) {
- if (Status.CONNECTED == node.getStatus()) {
- // record event
- addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it "
- + "was. Marking as Disconnected and issuing reconnection request.");
-
- // record heartbeat
- node.setHeartbeat(null);
- node.setStatus(Status.DISCONNECTED);
-
- requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
- }
- } else if (Status.DISCONNECTED == node.getStatus()) {
- // ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
- // the only node. We allow it if it is the only node because if we have a one-node cluster, then
- // we cannot manually reconnect it.
- if (node.isHeartbeatDisconnection() || nodes.size() == 1) {
- // record event
- if (node.isHeartbeatDisconnection()) {
- addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat. Issuing reconnection request.");
- } else {
- addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request.");
- }
-
- // record heartbeat
- node.setHeartbeat(mostRecentHeartbeat);
-
- // request reconnection
- requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
- } else {
- // disconnected nodes should not heartbeat, so we need to issue a disconnection request
- heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ". Issuing disconnection request.");
-
- // request node to disconnect
- requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected");
- }
-
- } else if (Status.DISCONNECTING == node.getStatus()) {
- /* ignore spurious heartbeat */
- } else { // node is either either connected or connecting
- // first heartbeat causes status change from connecting to connected
- if (Status.CONNECTING == node.getStatus()) {
- if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) {
- heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect.");
- continue;
- }
-
- // set status to connected
- node.setStatus(Status.CONNECTED);
-
- // record event
- addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node. Setting node to connected.");
-
- // notify service of updated node set
- notifyDataFlowManagementServiceOfNodeStatusChange();
-
- addBulletin(node, Severity.INFO, "Node Connected");
- } else {
- heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + ".");
- }
-
- // record heartbeat
- node.setHeartbeat(mostRecentHeartbeat);
-
- if (mostRecentHeartbeat.isPrimary()) {
- setPrimaryNodeId(node.getNodeId());
- }
- }
- } catch (final Exception e) {
- logger.error("Failed to process heartbeat from {}:{} due to {}",
- mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
- if (logger.isDebugEnabled()) {
- logger.error("", e);
- }
- }
- }
-
- logNodes("After Heartbeat Processing", heartbeatLogger);
- } finally {
- writeLock.unlock("processPendingHeartbeats");
- }
- }
-
-
@Override
public Set<Node> getNodes(final Status... statuses) {
final Set<Status> desiredStatusSet = new HashSet<>();
@@ -2125,15 +1861,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- private boolean isPrimaryNode(final NodeIdentifier nodeId) {
- readLock.lock();
- try {
- return primaryNodeId != null && primaryNodeId.equals(nodeId);
- } finally {
- readLock.unlock("isPrimaryNode");
- }
- }
-
private boolean isInSafeMode() {
readLock.lock();
try {
@@ -2143,7 +1870,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- private void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
+ void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
writeLock.lock();
try {
dataFlowManagementService.updatePrimaryNode(primaryNodeId);
@@ -3321,7 +3048,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// create new "updated" node by cloning old node and updating status
final Node currentNode = getRawNode(nodeResponse.getNodeId().getId());
final Node updatedNode = currentNode.clone();
- updatedNode.setStatus(nodeStatus);
+ clusterCoordinator.updateNodeStatus(updatedNode, nodeStatus);
// map updated node to its response
updatedNodesMap.put(updatedNode, nodeResponse);
@@ -4041,7 +3768,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Node node = updatedNodeEntry.getKey();
if (problematicNodeResponses.contains(nodeResponse)) {
- node.setStatus(Status.CONNECTED);
+ clusterCoordinator.updateNodeStatus(node, Status.CONNECTED);
problematicNodeResponses.remove(nodeResponse);
}
}
@@ -4227,7 +3954,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
*
* @return false if the IP is listed in the firewall or if the firewall is not configured; true otherwise
*/
- private boolean isBlockedByFirewall(final String ip) {
+ boolean isBlockedByFirewall(final String ip) {
if (isFirewallConfigured()) {
return !clusterFirewall.isPermissible(ip);
} else {
@@ -4257,7 +3984,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- private Node getRawNode(final String nodeId) {
+ Node getRawNode(final String nodeId) {
readLock.lock();
try {
for (final Node node : nodes) {
@@ -4320,16 +4047,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
-
- private boolean isHeartbeatMonitorRunning() {
- readLock.lock();
- try {
- return heartbeatMonitor != null;
- } finally {
- readLock.unlock("isHeartbeatMonitorRunning");
- }
- }
-
private boolean canChangeNodeState(final String method, final URI uri) {
return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
}
@@ -4359,87 +4076,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
- private void logNodes(final String header, final Logger logger) {
- if (logger.isTraceEnabled()) {
- if (StringUtils.isNotBlank(header)) {
- logger.trace(header);
- }
- for (final Node node : getNodes()) {
- logger.trace(node.getNodeId() + " : " + node.getStatus());
- }
- }
- }
-
-
- /**
- * This timer task simply processes any pending heartbeats. This timer task is not strictly needed, as HeartbeatMonitoringTimerTask will do this. However, this task is scheduled much more
- * frequently and by processing the heartbeats more frequently, the stats that we report have less of a delay.
- */
- private class ProcessPendingHeartbeatsTask extends TimerTask {
-
- @Override
- public void run() {
- writeLock.lock();
- try {
- processPendingHeartbeats();
- } finally {
- writeLock.unlock("Process Pending Heartbeats Task");
- }
- }
- }
-
- /**
- * A timer task to detect nodes that have not sent a heartbeat in a while. The "problem" nodes are marked as disconnected due to lack of heartbeat by the task. No disconnection request is sent to
- * the node. This is because either the node is not functioning in which case sending the request is futile or the node is running a bit slow. In the latter case, we'll wait for the next heartbeat
- * and send a reconnection request when we process the heartbeat in the heartbeatHandler() method.
- */
- private class HeartbeatMonitoringTimerTask extends TimerTask {
-
- @Override
- public void run() {
- // keep track of any status changes
- boolean statusChanged = false;
-
- writeLock.lock();
- try {
- // process all of the heartbeats before we decided to kick anyone out of the cluster.
- logger.debug("Processing pending heartbeats...");
- processPendingHeartbeats();
-
- logger.debug("Executing heartbeat monitoring");
-
- // check for any nodes that have not heartbeated in a long time
- for (final Node node : getRawNodes(Status.CONNECTED)) {
- // return prematurely if we were interrupted
- if (Thread.currentThread().isInterrupted()) {
- return;
- }
-
- // check if we received a recent heartbeat, changing status to disconnected if necessary
- final long lastHeardTimestamp = node.getHeartbeat().getCreatedTimestamp();
- final int heartbeatGapSeconds = (int) (new Date().getTime() - lastHeardTimestamp) / 1000;
- if (heartbeatGapSeconds > getMaxHeartbeatGapSeconds()) {
- node.setHeartbeatDisconnection();
- addEvent(node.getNodeId(), "Node disconnected due to lack of heartbeat.");
- addBulletin(node, Severity.WARNING, "Node disconnected due to lack of heartbeat");
- statusChanged = true;
- }
- }
-
- // if a status change occurred, make the necessary updates
- if (statusChanged) {
- logNodes("Heartbeat Monitoring disconnected node(s)", logger);
- // notify service of updated node set
- notifyDataFlowManagementServiceOfNodeStatusChange();
- } else {
- logNodes("Heartbeat Monitoring determined all nodes are healthy", logger);
- }
- } catch (final Exception ex) {
- logger.warn("Heartbeat monitor experienced exception while monitoring: " + ex, ex);
- } finally {
- writeLock.unlock("HeartbeatMonitoringTimerTask");
- }
- }
+ public NodeHeartbeat getLatestHeartbeat(final NodeIdentifier nodeId) {
+ return heartbeatMonitor.getLatestHeartbeat(nodeId);
}
@Override
@@ -4449,16 +4087,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final Collection<NodeInformation> nodeInfos = new ArrayList<>();
for (final Node node : getRawNodes(Status.CONNECTED)) {
final NodeIdentifier id = node.getNodeId();
- final HeartbeatPayload heartbeat = node.getHeartbeatPayload();
- if (heartbeat == null) {
- continue;
- }
final Integer siteToSitePort = id.getSiteToSitePort();
if (siteToSitePort == null) {
continue;
}
- final int flowFileCount = (int) heartbeat.getTotalFlowFileCount();
+
+ final NodeHeartbeat nodeHeartbeat = heartbeatMonitor.getLatestHeartbeat(id);
+ final int flowFileCount = nodeHeartbeat == null ? 0 : nodeHeartbeat.getFlowFileCount();
final NodeInformation nodeInfo = new NodeInformation(id.getSiteToSiteAddress(), siteToSitePort, id.getApiPort(),
id.isSiteToSiteSecure(), flowFileCount);
nodeInfos.add(nodeInfo);
@@ -4626,4 +4262,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
}
+
+ public void reportEvent(final NodeIdentifier nodeId, final Severity severity, final String message) {
+ bulletinRepository.addBulletin(BulletinFactory.createBulletin(nodeId == null ? "Cluster" : nodeId.getId(), severity.name(), message));
+ if (nodeId != null) {
+ addEvent(nodeId, message);
+ }
+ }
}