You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jf...@apache.org on 2021/05/01 21:23:23 UTC
[nifi] branch main updated: NIFI-8433 Added ability to decommission
a node in a cluster
This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 935566b NIFI-8433 Added ability to decommission a node in a cluster
935566b is described below
commit 935566ba235cc5705301305b6c88d4b24094cffe
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Apr 14 16:25:59 2021 -0400
NIFI-8433 Added ability to decommission a node in a cluster
This closes #5004
Signed-off-by: Joey Frazee <jf...@apache.org>
---
.../java/org/apache/nifi/bootstrap/RunNiFi.java | 181 ++++++++++++++-------
.../apache/nifi/controller/DecommissionTask.java | 24 +--
.../nifi/documentation/example/NiFiServerStub.java | 6 +
.../cluster/coordination/ClusterCoordinator.java | 7 +-
.../StandardClusterCoordinationProtocolSender.java | 62 ++++---
.../heartbeat/AbstractHeartbeatMonitor.java | 1 +
.../coordination/node/NodeClusterCoordinator.java | 79 +++++++--
.../cluster/lifecycle/ClusterDecommissionTask.java | 180 ++++++++++++++++++++
.../resources/nifi-cluster-manager-context.xml | 8 +-
.../heartbeat/TestAbstractHeartbeatMonitor.java | 13 +-
.../org/apache/nifi/controller/FlowController.java | 1 +
.../apache/nifi/headless/HeadlessNiFiServer.java | 6 +
.../nifi-resources/src/main/resources/bin/nifi.sh | 4 +-
.../java/org/apache/nifi/BootstrapListener.java | 28 ++++
.../org/apache/nifi/web/server/JettyServer.java | 8 +
.../src/main/java/org/apache/nifi/NiFiServer.java | 3 +
16 files changed, 492 insertions(+), 119 deletions(-)
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index dc213b5..7d3ecc8 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -111,8 +111,10 @@ public class RunNiFi {
public static final String PID_KEY = "pid";
public static final int STARTUP_WAIT_SECONDS = 60;
+ public static final long GRACEFUL_SHUTDOWN_RETRY_MILLIS = 2000L;
public static final String SHUTDOWN_CMD = "SHUTDOWN";
+ public static final String DECOMMISSION_CMD = "DECOMMISSION";
public static final String PING_CMD = "PING";
public static final String DUMP_CMD = "DUMP";
public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
@@ -169,6 +171,7 @@ public class RunNiFi {
System.out.println("Start : Start a new instance of Apache NiFi");
System.out.println("Stop : Stop a running instance of Apache NiFi");
System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
+ System.out.println("Decommission : Disconnects Apache NiFi from its cluster, offloads its data to other nodes in the cluster, removes itself from the cluster, and shuts down the instance");
System.out.println("Status : Determine if there is a running instance of Apache NiFi");
System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
@@ -219,6 +222,7 @@ public class RunNiFi {
case "start":
case "run":
case "stop":
+ case "decommission":
case "status":
case "is_loaded":
case "dump":
@@ -245,6 +249,9 @@ public class RunNiFi {
case "stop":
runNiFi.stop();
break;
+ case "decommission":
+ exitStatus = runNiFi.decommission();
+ break;
case "status":
exitStatus = runNiFi.status();
break;
@@ -810,6 +817,63 @@ public class RunNiFi {
"Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
}
+ public Integer decommission() throws IOException {
+ final Logger logger = cmdLogger;
+ final Integer port = getCurrentPort(logger);
+ if (port == null) {
+ logger.info("Apache NiFi is not currently running");
+ return 15;
+ }
+
+ // indicate that a stop command is in progress
+ final File lockFile = getLockFile(logger);
+ if (!lockFile.exists()) {
+ lockFile.createNewFile();
+ }
+
+ final Properties nifiProps = loadProperties(logger);
+ final String secretKey = nifiProps.getProperty("secret.key");
+ final String pid = nifiProps.getProperty(PID_KEY);
+ final File statusFile = getStatusFile(logger);
+ final File pidFile = getPidFile(logger);
+
+ try (final Socket socket = new Socket()) {
+ logger.debug("Connecting to NiFi instance");
+ socket.setSoTimeout(10000);
+ socket.connect(new InetSocketAddress("localhost", port));
+ logger.debug("Established connection to NiFi instance.");
+
+ // We don't know how long it will take for the offloading to complete. It could be a while. So don't timeout.
+ // User can press Ctrl+C to terminate if they don't want to wait
+ socket.setSoTimeout(0);
+
+ logger.debug("Sending DECOMMISSION Command to port {}", port);
+ final OutputStream out = socket.getOutputStream();
+ out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ socket.shutdownOutput();
+
+ final String response = readResponse(socket.getInputStream());
+
+ if (DECOMMISSION_CMD.equals(response)) {
+ logger.debug("Received response to DECOMMISSION command: {}", response);
+
+ if (pid != null) {
+ waitForShutdown(pid, logger, statusFile, pidFile);
+ }
+
+ return null;
+ } else {
+ logger.error("When sending DECOMMISSION command to NiFi, got unexpected response {}", response);
+ return 18;
+ }
+ } finally {
+ if (lockFile.exists() && !lockFile.delete()) {
+ logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
+ }
+ }
+ }
+
public void stop() throws IOException {
final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger);
@@ -843,69 +907,17 @@ public class RunNiFi {
out.flush();
socket.shutdownOutput();
- final InputStream in = socket.getInputStream();
- int lastChar;
- final StringBuilder sb = new StringBuilder();
- while ((lastChar = in.read()) > -1) {
- sb.append((char) lastChar);
- }
- final String response = sb.toString().trim();
-
+ final String response = readResponse(socket.getInputStream());
logger.debug("Received response to SHUTDOWN command: {}", response);
if (SHUTDOWN_CMD.equals(response)) {
logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
if (pid != null) {
- final Properties bootstrapProperties = new Properties();
- try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
- bootstrapProperties.load(fis);
- }
-
- String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
- int gracefulShutdownSeconds;
- try {
- gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
- } catch (final NumberFormatException nfe) {
- gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
- }
-
- notifyStop();
- final long startWait = System.nanoTime();
- while (isProcessRunning(pid, logger)) {
- logger.info("Waiting for Apache NiFi to finish shutting down...");
- final long waitNanos = System.nanoTime() - startWait;
- final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
- if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
- if (isProcessRunning(pid, logger)) {
- logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
- try {
- killProcessTree(pid, logger);
- } catch (final IOException ioe) {
- logger.error("Failed to kill Process with PID {}", pid);
- }
- }
- break;
- } else {
- try {
- Thread.sleep(2000L);
- } catch (final InterruptedException ie) {
- }
- }
- }
-
- if (statusFile.exists() && !statusFile.delete()) {
- logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
- }
-
- if (pidFile.exists() && !pidFile.delete()) {
- logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
- }
-
- logger.info("NiFi has finished shutting down.");
+ waitForShutdown(pid, logger, statusFile, pidFile);
}
} else {
- logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
+ logger.error("When sending SHUTDOWN command to NiFi, got unexpected response: {}", response);
}
} catch (final IOException ioe) {
if (pid == null) {
@@ -926,6 +938,65 @@ public class RunNiFi {
}
}
+ private String readResponse(final InputStream in) throws IOException {
+ int lastChar;
+ final StringBuilder sb = new StringBuilder();
+ while ((lastChar = in.read()) > -1) {
+ sb.append((char) lastChar);
+ }
+
+ return sb.toString().trim();
+ }
+
+ private void waitForShutdown(final String pid, final Logger logger, final File statusFile, final File pidFile) throws IOException {
+ final Properties bootstrapProperties = new Properties();
+ try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
+ bootstrapProperties.load(fis);
+ }
+
+ String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+ int gracefulShutdownSeconds;
+ try {
+ gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
+ } catch (final NumberFormatException nfe) {
+ gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+ }
+
+ notifyStop();
+ final long startWait = System.nanoTime();
+ while (isProcessRunning(pid, logger)) {
+ logger.info("Waiting for Apache NiFi to finish shutting down...");
+ final long waitNanos = System.nanoTime() - startWait;
+ final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+ if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
+ if (isProcessRunning(pid, logger)) {
+ logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
+ try {
+ killProcessTree(pid, logger);
+ } catch (final IOException ioe) {
+ logger.error("Failed to kill Process with PID {}", pid);
+ }
+ }
+ break;
+ } else {
+ try {
+ Thread.sleep(GRACEFUL_SHUTDOWN_RETRY_MILLIS);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+
+ if (statusFile.exists() && !statusFile.delete()) {
+ logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
+ }
+
+ if (pidFile.exists() && !pidFile.delete()) {
+ logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
+ }
+
+ logger.info("NiFi has finished shutting down.");
+ }
+
private static List<String> getChildProcesses(final String ppid) throws IOException {
final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
final List<String> childPids = new ArrayList<>();
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
similarity index 60%
copy from nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
index 8124876..58aa3de 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
@@ -14,27 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi;
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.diagnostics.DiagnosticsFactory;
-import org.apache.nifi.nar.ExtensionMapping;
-import org.apache.nifi.util.NiFiProperties;
+package org.apache.nifi.controller;
-import java.util.Set;
-
-/**
- *
- */
-public interface NiFiServer {
-
- void start();
-
- void initialize(NiFiProperties properties, Bundle systemBundle, Set<Bundle> bundles, ExtensionMapping extensionMapping);
-
- void stop();
-
- DiagnosticsFactory getDiagnosticsFactory();
-
- DiagnosticsFactory getThreadDumpFactory();
+public interface DecommissionTask {
+ void decommission() throws InterruptedException;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
index f77b909..d82bec9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
@@ -18,6 +18,7 @@ package org.apache.nifi.documentation.example;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -54,4 +55,9 @@ public class NiFiServerStub implements NiFiServer {
public DiagnosticsFactory getThreadDumpFactory() {
return null;
}
+
+ @Override
+ public DecommissionTask getDecommissionTask() {
+ return null;
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 2b7c07f..a27dac3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -18,11 +18,11 @@
package org.apache.nifi.cluster.coordination;
import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
-import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@@ -32,6 +32,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Future;
/**
* <p>
@@ -85,7 +86,7 @@ public interface ClusterCoordinator {
* @param offloadCode the code that represents why this node is being asked to be offloaded
* @param explanation an explanation as to why the node is being asked to be offloaded
*/
- void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
+ Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
/**
* Sends a request to the node to disconnect from the cluster.
@@ -96,7 +97,7 @@ public interface ClusterCoordinator {
* @param explanation an explanation as to why the node is being asked to disconnect
* from the cluster
*/
- void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+ Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
/**
* Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
index b21068f..4621172 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
@@ -16,19 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -36,11 +23,11 @@ import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -52,6 +39,20 @@ import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A protocol sender for sending protocol messages from the cluster manager to
* nodes.
@@ -286,14 +287,35 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
executor.submit(new Runnable() {
@Override
public void run() {
- try (final Socket socket = createSocket(nodeId, true)) {
- // marshal message to output stream
- socket.getOutputStream().write(msgBytes);
- } catch (final IOException ioe) {
- throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
+ final int attempts = 10;
+ final int retrySeconds = 6;
+ Exception lastException = null;
+
+ for (int i = 0; i < attempts; i++) {
+ try (final Socket socket = createSocket(nodeId, true)) {
+ // marshal message to output stream
+ final OutputStream out = socket.getOutputStream();
+ out.write(msgBytes);
+ } catch (final Exception e) {
+ logger.warn("Failed to send Node Status Change message to {}", nodeId, e);
+
+ lastException = e;
+
+ try {
+ Thread.sleep(retrySeconds * 1000L);
+ } catch (final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+
+ continue;
+ }
+
+ logger.debug("Notified {} of status change {}", nodeId, msg);
+ return;
}
- logger.debug("Notified {} of status change {}", nodeId, msg);
+ throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, lastException);
}
});
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 2b02a49..c8a77ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -343,4 +343,5 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) {
}
}
+
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index d029034..5326073 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -86,9 +86,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
@@ -270,7 +272,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return nodeId;
}
- private NodeIdentifier waitForElectedClusterCoordinator() {
+ public NodeIdentifier waitForElectedClusterCoordinator() {
return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false));
}
@@ -312,7 +314,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
removeNode(nodeId);
} else {
- updateNodeStatus(nodeId, proposedStatus, false);
+ forcefullyUpdateNodeStatus(nodeId, proposedStatus, false);
}
}
@@ -339,11 +341,45 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
return removed;
}
+ /**
+ * Updates the status of the node with the given ID to the given status if and only if the updated status's Update ID is >= the Update ID of the current
+ * Node status or if there is currently no node with the given identifier
+ * @param nodeId the NodeIdentifier for the node whose ID is to be updated
+ * @param updatedStatus the new status for the node
+ * @return the previous status for the node
+ */
private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) {
return updateNodeStatus(nodeId, updatedStatus, true);
}
private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
+ final String nodeUuid = nodeId.getId();
+
+ while (true) {
+ final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeUuid);
+ if (currentStatus == null) {
+ onNodeAdded(nodeId, storeState);
+
+ // Return null because that was the previous state
+ return null;
+ }
+
+ if (currentStatus.getUpdateIdentifier() > updatedStatus.getUpdateIdentifier()) {
+ logger.debug("Received status update {} but ignoring it because it has an Update ID of {} and the current status has an Update ID of {}",
+ updatedStatus, updatedStatus.getUpdateIdentifier(), currentStatus.getUpdateIdentifier());
+ return currentStatus;
+ }
+
+ final boolean updated = nodeStatuses.replace(nodeUuid, currentStatus, updatedStatus);
+ if (updated) {
+ onNodeStateChange(nodeId, updatedStatus.getState());
+ logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, updatedStatus);
+ return currentStatus;
+ }
+ }
+ }
+
+ private NodeConnectionStatus forcefullyUpdateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId.getId(), updatedStatus);
if (evictedStatus == null) {
onNodeAdded(nodeId, storeState);
@@ -502,12 +538,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
@Override
- public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
+ public Future<Void> requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
final Set<NodeIdentifier> offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED);
if (offloadNodeIds.contains(nodeId)) {
logger.debug("Attempted to offload node but the node is already offloading or offloaded");
// no need to do anything here, the node is currently offloading or already offloaded
- return;
+ return CompletableFuture.completedFuture(null);
}
final Set<NodeIdentifier> disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
@@ -524,11 +560,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setExplanation(explanation);
addNodeEvent(nodeId, "Offload requested due to " + explanation);
- offloadAsynchronously(request, 10, 5);
+ return offloadAsynchronously(request, 10, 5);
}
@Override
- public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ public Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
@@ -541,7 +577,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
// There is no need to tell the node that it's disconnected if it is due to being
// shutdown, as we will not be able to connect to the node anyway.
if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
- return;
+ return CompletableFuture.completedFuture(null);
}
final DisconnectMessage request = new DisconnectMessage();
@@ -549,7 +585,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
request.setExplanation(explanation);
addNodeEvent(nodeId, "Disconnection requested due to " + explanation);
- disconnectAsynchronously(request, 10, 5);
+ return disconnectAsynchronously(request, 10, 5);
}
@Override
@@ -834,12 +870,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) {
final NodeIdentifier nodeId = status.getNodeIdentifier();
- // In this case, we are using nodeStatuses.put() instead of getting the current value and
+ // In this case, we are using nodeStatuses.put() (i.e., forcefully updating node status) instead of getting the current value and
// comparing that to the new value and using the one with the largest update id. This is because
// this method is called when something occurs that causes this node to change the status of the
// node in question. We only use comparisons against the current value when we receive an update
// about a node status from a different node, since those may be received out-of-order.
- final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
+ final NodeConnectionStatus currentStatus = forcefullyUpdateNodeStatus(nodeId, status, true);
final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
if (Objects.equals(status, currentStatus)) {
logger.debug("Received notification of Node Status Change for {} but the status remained the same: {}", nodeId, status);
@@ -901,19 +937,24 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
senderListener.notifyNodeStatusChange(nodesToNotify, message);
}
- private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
+ private Future<Void> offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
final Thread offloadThread = new Thread(new Runnable() {
@Override
public void run() {
final NodeIdentifier nodeId = request.getNodeId();
+ Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
senderListener.offload(request);
reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
+ future.complete(null);
return;
} catch (final Exception e) {
logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e);
+ lastException = e;
try {
Thread.sleep(retrySeconds * 1000L);
@@ -923,38 +964,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
}
+
+ future.completeExceptionally(lastException);
}
}, "Offload " + request.getNodeId());
offloadThread.start();
+ return future;
}
- private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
+ private Future<Void> disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
+ final CompletableFuture<Void> future = new CompletableFuture<>();
+
final Thread disconnectThread = new Thread(new Runnable() {
@Override
public void run() {
final NodeIdentifier nodeId = request.getNodeId();
+ Exception lastException = null;
for (int i = 0; i < attempts; i++) {
try {
senderListener.disconnect(request);
reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
+ future.complete(null);
return;
} catch (final Exception e) {
logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", request.getNodeId(), request.getExplanation());
+ lastException = e;
try {
Thread.sleep(retrySeconds * 1000L);
} catch (final InterruptedException ie) {
+ future.completeExceptionally(ie);
Thread.currentThread().interrupt();
return;
}
}
}
+
+ future.completeExceptionally(lastException);
}
}, "Disconnect " + request.getNodeId());
disconnectThread.start();
+ return future;
}
public void validateHeartbeat(final NodeHeartbeat heartbeat) {
@@ -1092,12 +1145,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
if (removeNodeConditionally(nodeId, oldStatus)) {
storeState();
+ logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
}
} else {
updateNodeStatus(nodeId, updatedStatus);
}
- logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
logger.debug("State of cluster nodes is now {}", nodeStatuses);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
new file mode 100644
index 0000000..592e51c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.lifecycle;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.FlowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterDecommissionTask implements DecommissionTask {
+ private static final Logger logger = LoggerFactory.getLogger(ClusterDecommissionTask.class);
+ private static final int delaySeconds = 3;
+
+ private final ClusterCoordinator clusterCoordinator;
+ private final FlowController flowController;
+ private NodeIdentifier localNodeIdentifier;
+
+ public ClusterDecommissionTask(final ClusterCoordinator clusterCoordinator, final FlowController flowController) {
+ this.clusterCoordinator = clusterCoordinator;
+ this.flowController = flowController;
+ }
+
+ @Override
+ public synchronized void decommission() throws InterruptedException {
+ if (clusterCoordinator == null) {
+ throw new IllegalStateException("Cannot decommission Node because it is not part of a cluster");
+ }
+
+ logger.info("Decommissioning Node...");
+ localNodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
+ if (localNodeIdentifier == null) {
+ throw new IllegalStateException("Node has not yet connected to the cluster");
+ }
+
+ flowController.stopHeartbeating();
+ flowController.setClustered(false, null);
+ logger.info("Instructed FlowController to stop sending heartbeats to Cluster Coordinator and take Cluster Disconnect actions");
+
+ disconnectNode();
+ logger.info("Requested that node be disconnected from cluster");
+
+ waitForDisconnection();
+ logger.info("Successfully disconnected node from cluster");
+
+ offloadNode();
+ logger.info("Successfully triggered Node Offload. Will wait for offload to complete");
+
+ waitForOffloadToFinish();
+ logger.info("Offload has successfully completed.");
+
+ removeFromCluster();
+ logger.info("Requested that node be removed from cluster.");
+
+ waitForRemoval();
+ logger.info("Node successfully removed from cluster. Decommission is complete.");
+ }
+
+ private void disconnectNode() throws InterruptedException {
+ logger.info("Requesting that Node disconnect from cluster");
+
+ while (true) {
+ final Future<Void> future = clusterCoordinator.requestNodeDisconnect(localNodeIdentifier, DisconnectionCode.USER_DISCONNECTED, "Node is being decommissioned");
+ try {
+ future.get();
+ return;
+ } catch (final ExecutionException e) {
+ final Throwable cause = e.getCause();
+ logger.error("Failed when attempting to disconnect node from cluster", cause);
+ }
+ }
+ }
+
+ private void waitForDisconnection() throws InterruptedException {
+ logger.info("Waiting for Node to be completely disconnected from cluster");
+ waitForState(Collections.singleton(NodeConnectionState.DISCONNECTED));
+ }
+
+ private void offloadNode() throws InterruptedException {
+ logger.info("Requesting that Node be offloaded");
+
+ while (true) {
+ final Future<Void> future = clusterCoordinator.requestNodeOffload(localNodeIdentifier, OffloadCode.OFFLOADED, "Node is being decommissioned");
+ try {
+ future.get();
+ break;
+ } catch (final ExecutionException e) {
+ final Throwable cause = e.getCause();
+ logger.error("Failed when attempting to disconnect node from cluster", cause);
+ }
+ }
+
+ // Wait until status changes to either OFFLOADING or OFFLOADED.
+ waitForState(new HashSet<>(Arrays.asList(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED)));
+ }
+
+ private void waitForState(final Set<NodeConnectionState> acceptableStates) throws InterruptedException {
+ while (true) {
+ final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ final NodeConnectionState state = status.getState();
+ logger.debug("Node state is {}", state);
+
+ if (acceptableStates.contains(state)) {
+ return;
+ }
+
+ TimeUnit.SECONDS.sleep(delaySeconds);
+ }
+ }
+
+ private void waitForOffloadToFinish() throws InterruptedException {
+ logger.info("Waiting for Node to finish offloading");
+
+ while (true) {
+ final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ final NodeConnectionState state = status.getState();
+ if (state == NodeConnectionState.OFFLOADED) {
+ return;
+ }
+
+ if (state != NodeConnectionState.OFFLOADING) {
+ throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state);
+ }
+
+ logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
+ TimeUnit.SECONDS.sleep(delaySeconds);
+ }
+ }
+
+ private void removeFromCluster() {
+ clusterCoordinator.removeNode(localNodeIdentifier, "<Local Decommission>");
+ }
+
+ private void waitForRemoval() throws InterruptedException {
+ logger.info("Waiting for Node to be completely removed from cluster");
+
+ while (true) {
+ final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+ if (status == null) {
+ return;
+ }
+
+ final NodeConnectionState state = status.getState();
+ if (state == NodeConnectionState.REMOVED) {
+ return;
+ }
+
+ logger.debug("Node state is {}. Will wait {} seconds and check again", state, delaySeconds);
+ TimeUnit.SECONDS.sleep(delaySeconds);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index c1a7665..a4c2573 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -51,9 +51,15 @@
<property name="properties" ref="nifiProperties"/>
<property name="extensionManager" ref="extensionManager" />
</bean>
-
+
<!-- Heartbeat Monitor -->
<bean id="heartbeatMonitor" class="org.apache.nifi.cluster.spring.HeartbeatMonitorFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>
+
+ <bean id="decommissionTask" class="org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask">
+ <constructor-arg ref="clusterCoordinator" />
+ <constructor-arg ref="flowController" />
+ </bean>
+
</beans>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 98aa0b4..5efb9ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -20,11 +20,11 @@ package org.apache.nifi.cluster.coordination.heartbeat;
import org.apache.nifi.cluster.ReportedEvent;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
-import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.event.NodeEvent;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.reporting.Severity;
@@ -45,7 +45,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
@@ -119,9 +121,10 @@ public class TestAbstractHeartbeatMonitor {
}
@Override
- public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+ public synchronized Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
requestedToDisconnect.add(nodeId);
+ return CompletableFuture.completedFuture(null);
}
};
@@ -253,13 +256,15 @@ public class TestAbstractHeartbeatMonitor {
}
@Override
- public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
+ public synchronized Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED));
+ return CompletableFuture.completedFuture(null);
}
@Override
- public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+ public synchronized Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2cb5e28..5154963 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2223,6 +2223,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
if (heartbeatSenderFuture != null) {
+ LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator");
heartbeatSenderFuture.cancel(false);
}
} finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index 833075f..c2b5143 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -28,6 +28,7 @@ import org.apache.nifi.authorization.exception.AuthorizationAccessException;
import org.apache.nifi.authorization.exception.AuthorizerCreationException;
import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.StandardFlowService;
import org.apache.nifi.controller.flow.FlowManager;
@@ -193,6 +194,11 @@ public class HeadlessNiFiServer implements NiFiServer {
return new ThreadDumpDiagnosticsFactory();
}
+ @Override
+ public DecommissionTask getDecommissionTask() {
+ return null;
+ }
+
public void stop() {
try {
flowService.stop(false);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index f1d1f59..63fbfec 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -430,7 +430,7 @@ case "$1" in
install "$@"
;;
- start|stop|run|status|is_loaded|dump|diagnostics|env|stateless)
+ start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless)
main "$@"
;;
@@ -440,6 +440,6 @@ case "$1" in
run "start"
;;
*)
- echo "Usage nifi {start|stop|run|restart|status|dump|diagnostics|install|stateless}"
+ echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless}"
;;
esac
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 9eeebaf..b9b12ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi;
+import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.util.LimitingInputStream;
import org.slf4j.Logger;
@@ -203,6 +204,23 @@ public class BootstrapListener {
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
+ case DECOMMISSION:
+ logger.info("Received DECOMMISSION request from Bootstrap");
+
+ try {
+ decommission();
+ sendAnswer(socket.getOutputStream(), "DECOMMISSION");
+ nifi.shutdownHook(false);
+ } catch (final Exception e) {
+ final OutputStream out = socket.getOutputStream();
+
+ out.write(("Failed to decommission node: " + e + "; see app-log for additional details").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ } finally {
+ socket.close();
+ }
+
+ break;
case DIAGNOSTICS:
logger.info("Received DIAGNOSTICS request from Bootstrap");
final String[] args = request.getArgs();
@@ -250,6 +268,15 @@ public class BootstrapListener {
diagnosticsDump.writeTo(out);
}
+ private void decommission() throws InterruptedException {
+ final DecommissionTask decommissionTask = nifi.getServer().getDecommissionTask();
+ if (decommissionTask == null) {
+ throw new IllegalArgumentException("This NiFi instance does not support decommissioning");
+ }
+
+ decommissionTask.decommission();
+ }
+
private void writeDiagnostics(final OutputStream out, final boolean verbose) throws IOException {
final DiagnosticsDump diagnosticsDump = nifi.getServer().getDiagnosticsFactory().create(verbose);
diagnosticsDump.writeTo(out);
@@ -304,6 +331,7 @@ public class BootstrapListener {
SHUTDOWN,
DUMP,
DIAGNOSTICS,
+ DECOMMISSION,
PING,
IS_LOADED
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index 3804ea7..efcf271 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.NiFiServer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.UninheritableFlowException;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
@@ -153,6 +154,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
private NarAutoLoader narAutoLoader;
private DiagnosticsFactory diagnosticsFactory;
private SslContextFactory.Server sslContextFactory;
+ private DecommissionTask decommissionTask;
private WebAppContext webApiContext;
private WebAppContext webDocsContext;
@@ -1172,6 +1174,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
}
diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
+ decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
}
// ensure the web document war was loaded and provide the extension mapping
@@ -1245,6 +1248,11 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
return new ThreadDumpDiagnosticsFactory();
}
+ @Override
+ public DecommissionTask getDecommissionTask() {
+ return decommissionTask;
+ }
+
private void performInjectionForComponentUis(final Collection<WebAppContext> componentUiExtensionWebContexts,
final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) {
if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
index 8124876..cc87079 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
@@ -17,6 +17,7 @@
package org.apache.nifi;
import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.util.NiFiProperties;
@@ -37,4 +38,6 @@ public interface NiFiServer {
DiagnosticsFactory getDiagnosticsFactory();
DiagnosticsFactory getThreadDumpFactory();
+
+ DecommissionTask getDecommissionTask();
}