You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jf...@apache.org on 2021/05/01 21:23:23 UTC

[nifi] branch main updated: NIFI-8433 Added ability to decommission a node in a cluster

This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 935566b  NIFI-8433 Added ability to decommission a node in a cluster
935566b is described below

commit 935566ba235cc5705301305b6c88d4b24094cffe
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Wed Apr 14 16:25:59 2021 -0400

    NIFI-8433 Added ability to decommission a node in a cluster
    
    This closes #5004
    
    Signed-off-by: Joey Frazee <jf...@apache.org>
---
 .../java/org/apache/nifi/bootstrap/RunNiFi.java    | 181 ++++++++++++++-------
 .../apache/nifi/controller/DecommissionTask.java   |  24 +--
 .../nifi/documentation/example/NiFiServerStub.java |   6 +
 .../cluster/coordination/ClusterCoordinator.java   |   7 +-
 .../StandardClusterCoordinationProtocolSender.java |  62 ++++---
 .../heartbeat/AbstractHeartbeatMonitor.java        |   1 +
 .../coordination/node/NodeClusterCoordinator.java  |  79 +++++++--
 .../cluster/lifecycle/ClusterDecommissionTask.java | 180 ++++++++++++++++++++
 .../resources/nifi-cluster-manager-context.xml     |   8 +-
 .../heartbeat/TestAbstractHeartbeatMonitor.java    |  13 +-
 .../org/apache/nifi/controller/FlowController.java |   1 +
 .../apache/nifi/headless/HeadlessNiFiServer.java   |   6 +
 .../nifi-resources/src/main/resources/bin/nifi.sh  |   4 +-
 .../java/org/apache/nifi/BootstrapListener.java    |  28 ++++
 .../org/apache/nifi/web/server/JettyServer.java    |   8 +
 .../src/main/java/org/apache/nifi/NiFiServer.java  |   3 +
 16 files changed, 492 insertions(+), 119 deletions(-)

diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
index dc213b5..7d3ecc8 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/RunNiFi.java
@@ -111,8 +111,10 @@ public class RunNiFi {
     public static final String PID_KEY = "pid";
 
     public static final int STARTUP_WAIT_SECONDS = 60;
+    public static final long GRACEFUL_SHUTDOWN_RETRY_MILLIS = 2000L;
 
     public static final String SHUTDOWN_CMD = "SHUTDOWN";
+    public static final String DECOMMISSION_CMD = "DECOMMISSION";
     public static final String PING_CMD = "PING";
     public static final String DUMP_CMD = "DUMP";
     public static final String DIAGNOSTICS_CMD = "DIAGNOSTICS";
@@ -169,6 +171,7 @@ public class RunNiFi {
         System.out.println("Start : Start a new instance of Apache NiFi");
         System.out.println("Stop : Stop a running instance of Apache NiFi");
         System.out.println("Restart : Stop Apache NiFi, if it is running, and then start a new instance");
+        System.out.println("Decommission : Disconnects Apache NiFi from its cluster, offloads its data to other nodes in the cluster, removes itself from the cluster, and shuts down the instance");
         System.out.println("Status : Determine if there is a running instance of Apache NiFi");
         System.out.println("Dump : Write a Thread Dump to the file specified by [options], or to the log if no file is given");
         System.out.println("Diagnostics : Write diagnostic information to the file specified by [options], or to the log if no file is given. The --verbose flag may be provided as an option before " +
@@ -219,6 +222,7 @@ public class RunNiFi {
             case "start":
             case "run":
             case "stop":
+            case "decommission":
             case "status":
             case "is_loaded":
             case "dump":
@@ -245,6 +249,9 @@ public class RunNiFi {
             case "stop":
                 runNiFi.stop();
                 break;
+            case "decommission":
+                exitStatus = runNiFi.decommission();
+                break;
             case "status":
                 exitStatus = runNiFi.status();
                 break;
@@ -810,6 +817,63 @@ public class RunNiFi {
                 "Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
     }
 
+    public Integer decommission() throws IOException {
+        final Logger logger = cmdLogger;
+        final Integer port = getCurrentPort(logger);
+        if (port == null) {
+            logger.info("Apache NiFi is not currently running");
+            return 15;
+        }
+
+        // indicate that a stop command is in progress
+        final File lockFile = getLockFile(logger);
+        if (!lockFile.exists()) {
+            lockFile.createNewFile();
+        }
+
+        final Properties nifiProps = loadProperties(logger);
+        final String secretKey = nifiProps.getProperty("secret.key");
+        final String pid = nifiProps.getProperty(PID_KEY);
+        final File statusFile = getStatusFile(logger);
+        final File pidFile = getPidFile(logger);
+
+        try (final Socket socket = new Socket()) {
+            logger.debug("Connecting to NiFi instance");
+            socket.setSoTimeout(10000);
+            socket.connect(new InetSocketAddress("localhost", port));
+            logger.debug("Established connection to NiFi instance.");
+
+            // We don't know how long it will take for the offloading to complete. It could be a while. So don't timeout.
+            // User can press Ctrl+C to terminate if they don't want to wait
+            socket.setSoTimeout(0);
+
+            logger.debug("Sending DECOMMISSION Command to port {}", port);
+            final OutputStream out = socket.getOutputStream();
+            out.write((DECOMMISSION_CMD + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+            out.flush();
+            socket.shutdownOutput();
+
+            final String response = readResponse(socket.getInputStream());
+
+            if (DECOMMISSION_CMD.equals(response)) {
+                logger.debug("Received response to DECOMMISSION command: {}", response);
+
+                if (pid != null) {
+                    waitForShutdown(pid, logger, statusFile, pidFile);
+                }
+
+                return null;
+            } else {
+                logger.error("When sending DECOMMISSION command to NiFi, got unexpected response {}", response);
+                return 18;
+            }
+        } finally {
+            if (lockFile.exists() && !lockFile.delete()) {
+                logger.error("Failed to delete lock file {}; this file should be cleaned up manually", lockFile);
+            }
+        }
+    }
+
     public void stop() throws IOException {
         final Logger logger = cmdLogger;
         final Integer port = getCurrentPort(logger);
@@ -843,69 +907,17 @@ public class RunNiFi {
             out.flush();
             socket.shutdownOutput();
 
-            final InputStream in = socket.getInputStream();
-            int lastChar;
-            final StringBuilder sb = new StringBuilder();
-            while ((lastChar = in.read()) > -1) {
-                sb.append((char) lastChar);
-            }
-            final String response = sb.toString().trim();
-
+            final String response = readResponse(socket.getInputStream());
             logger.debug("Received response to SHUTDOWN command: {}", response);
 
             if (SHUTDOWN_CMD.equals(response)) {
                 logger.info("Apache NiFi has accepted the Shutdown Command and is shutting down now");
 
                 if (pid != null) {
-                    final Properties bootstrapProperties = new Properties();
-                    try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
-                        bootstrapProperties.load(fis);
-                    }
-
-                    String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
-                    int gracefulShutdownSeconds;
-                    try {
-                        gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
-                    } catch (final NumberFormatException nfe) {
-                        gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
-                    }
-
-                    notifyStop();
-                    final long startWait = System.nanoTime();
-                    while (isProcessRunning(pid, logger)) {
-                        logger.info("Waiting for Apache NiFi to finish shutting down...");
-                        final long waitNanos = System.nanoTime() - startWait;
-                        final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
-                        if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
-                            if (isProcessRunning(pid, logger)) {
-                                logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
-                                try {
-                                    killProcessTree(pid, logger);
-                                } catch (final IOException ioe) {
-                                    logger.error("Failed to kill Process with PID {}", pid);
-                                }
-                            }
-                            break;
-                        } else {
-                            try {
-                                Thread.sleep(2000L);
-                            } catch (final InterruptedException ie) {
-                            }
-                        }
-                    }
-
-                    if (statusFile.exists() && !statusFile.delete()) {
-                        logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
-                    }
-
-                    if (pidFile.exists() && !pidFile.delete()) {
-                        logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
-                    }
-
-                    logger.info("NiFi has finished shutting down.");
+                    waitForShutdown(pid, logger, statusFile, pidFile);
                 }
             } else {
-                logger.error("When sending SHUTDOWN command to NiFi, got unexpected response {}", response);
+                logger.error("When sending SHUTDOWN command to NiFi, got unexpected response: {}", response);
             }
         } catch (final IOException ioe) {
             if (pid == null) {
@@ -926,6 +938,65 @@ public class RunNiFi {
         }
     }
 
+    private String readResponse(final InputStream in) throws IOException {
+        int lastChar;
+        final StringBuilder sb = new StringBuilder();
+        while ((lastChar = in.read()) > -1) {
+            sb.append((char) lastChar);
+        }
+
+        return sb.toString().trim();
+    }
+
+    private void waitForShutdown(final String pid, final Logger logger, final File statusFile, final File pidFile) throws IOException {
+        final Properties bootstrapProperties = new Properties();
+        try (final FileInputStream fis = new FileInputStream(bootstrapConfigFile)) {
+            bootstrapProperties.load(fis);
+        }
+
+        String gracefulShutdown = bootstrapProperties.getProperty(GRACEFUL_SHUTDOWN_PROP, DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+        int gracefulShutdownSeconds;
+        try {
+            gracefulShutdownSeconds = Integer.parseInt(gracefulShutdown);
+        } catch (final NumberFormatException nfe) {
+            gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
+        }
+
+        notifyStop();
+        final long startWait = System.nanoTime();
+        while (isProcessRunning(pid, logger)) {
+            logger.info("Waiting for Apache NiFi to finish shutting down...");
+            final long waitNanos = System.nanoTime() - startWait;
+            final long waitSeconds = TimeUnit.NANOSECONDS.toSeconds(waitNanos);
+            if (waitSeconds >= gracefulShutdownSeconds && gracefulShutdownSeconds > 0) {
+                if (isProcessRunning(pid, logger)) {
+                    logger.warn("NiFi has not finished shutting down after {} seconds. Killing process.", gracefulShutdownSeconds);
+                    try {
+                        killProcessTree(pid, logger);
+                    } catch (final IOException ioe) {
+                        logger.error("Failed to kill Process with PID {}", pid);
+                    }
+                }
+                break;
+            } else {
+                try {
+                    Thread.sleep(GRACEFUL_SHUTDOWN_RETRY_MILLIS);
+                } catch (final InterruptedException ie) {
+                }
+            }
+        }
+
+        if (statusFile.exists() && !statusFile.delete()) {
+            logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
+        }
+
+        if (pidFile.exists() && !pidFile.delete()) {
+            logger.error("Failed to delete pid file {}; this file should be cleaned up manually", pidFile);
+        }
+
+        logger.info("NiFi has finished shutting down.");
+    }
+
     private static List<String> getChildProcesses(final String ppid) throws IOException {
         final Process proc = Runtime.getRuntime().exec(new String[]{"ps", "-o", "pid", "--no-headers", "--ppid", ppid});
         final List<String> childPids = new ArrayList<>();
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
similarity index 60%
copy from nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
copy to nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
index 8124876..58aa3de 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/DecommissionTask.java
@@ -14,27 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi;
 
-import org.apache.nifi.bundle.Bundle;
-import org.apache.nifi.diagnostics.DiagnosticsFactory;
-import org.apache.nifi.nar.ExtensionMapping;
-import org.apache.nifi.util.NiFiProperties;
+package org.apache.nifi.controller;
 
-import java.util.Set;
-
-/**
- *
- */
-public interface NiFiServer {
-
-    void start();
-
-    void initialize(NiFiProperties properties, Bundle systemBundle, Set<Bundle> bundles, ExtensionMapping extensionMapping);
-
-    void stop();
-
-    DiagnosticsFactory getDiagnosticsFactory();
-
-    DiagnosticsFactory getThreadDumpFactory();
+public interface DecommissionTask {
+    void decommission() throws InterruptedException;
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
index f77b909..d82bec9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/test/java/org/apache/nifi/documentation/example/NiFiServerStub.java
@@ -18,6 +18,7 @@ package org.apache.nifi.documentation.example;
 
 import org.apache.nifi.NiFiServer;
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.diagnostics.DiagnosticsFactory;
 import org.apache.nifi.nar.ExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
@@ -54,4 +55,9 @@ public class NiFiServerStub implements NiFiServer {
     public DiagnosticsFactory getThreadDumpFactory() {
         return null;
     }
+
+    @Override
+    public DecommissionTask getDecommissionTask() {
+        return null;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 2b7c07f..a27dac3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -18,11 +18,11 @@
 package org.apache.nifi.cluster.coordination;
 
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
-import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.reporting.Severity;
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
 
 /**
  * <p>
@@ -85,7 +86,7 @@ public interface ClusterCoordinator {
      * @param offloadCode the code that represents why this node is being asked to be offloaded
      * @param explanation an explanation as to why the node is being asked to be offloaded
      */
-    void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
+    Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation);
 
     /**
      * Sends a request to the node to disconnect from the cluster.
@@ -96,7 +97,7 @@ public interface ClusterCoordinator {
      * @param explanation an explanation as to why the node is being asked to disconnect
      *            from the cluster
      */
-    void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
+    Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation);
 
     /**
      * Notifies the Cluster Coordinator that the node with the given ID has requested to disconnect
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
index b21068f..4621172 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
@@ -16,19 +16,6 @@
  */
 package org.apache.nifi.cluster.protocol.impl;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.ClusterCoordinationProtocolSender;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -36,11 +23,11 @@ import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
 import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
 import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -52,6 +39,20 @@ import org.apache.nifi.util.FormatUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * A protocol sender for sending protocol messages from the cluster manager to
  * nodes.
@@ -286,14 +287,35 @@ public class StandardClusterCoordinationProtocolSender implements ClusterCoordin
             executor.submit(new Runnable() {
                 @Override
                 public void run() {
-                    try (final Socket socket = createSocket(nodeId, true)) {
-                        // marshal message to output stream
-                        socket.getOutputStream().write(msgBytes);
-                    } catch (final IOException ioe) {
-                        throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, ioe);
+                    final int attempts = 10;
+                    final int retrySeconds = 6;
+                    Exception lastException = null;
+
+                    for (int i = 0; i < attempts; i++) {
+                        try (final Socket socket = createSocket(nodeId, true)) {
+                            // marshal message to output stream
+                            final OutputStream out = socket.getOutputStream();
+                            out.write(msgBytes);
+                        } catch (final Exception e) {
+                            logger.warn("Failed to send Node Status Change message to {}", nodeId, e);
+
+                            lastException = e;
+
+                            try {
+                                Thread.sleep(retrySeconds * 1000L);
+                            } catch (final InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                return;
+                            }
+
+                            continue;
+                        }
+
+                        logger.debug("Notified {} of status change {}", nodeId, msg);
+                        return;
                     }
 
-                    logger.debug("Notified {} of status change {}", nodeId, msg);
+                    throw new ProtocolException("Failed to send Node Status Change message to " + nodeId, lastException);
                 }
             });
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 2b02a49..c8a77ed 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -343,4 +343,5 @@ public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
         public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) {
         }
     }
+
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index d029034..5326073 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -86,9 +86,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -270,7 +272,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return nodeId;
     }
 
-    private NodeIdentifier waitForElectedClusterCoordinator() {
+    public NodeIdentifier waitForElectedClusterCoordinator() {
         return waitForNodeIdentifier(() -> getElectedActiveCoordinatorNode(false));
     }
 
@@ -312,7 +314,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
             if (proposedStatus.getState() == NodeConnectionState.REMOVED) {
                 removeNode(nodeId);
             } else {
-                updateNodeStatus(nodeId, proposedStatus, false);
+                forcefullyUpdateNodeStatus(nodeId, proposedStatus, false);
             }
         }
 
@@ -339,11 +341,45 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         return removed;
     }
 
+    /**
+     * Updates the status of the node with the given ID to the given status if and only if the updated status's Update ID is >= the Update ID of the current
+     * Node status or if there is currently no node with the given identifier
+     * @param nodeId the NodeIdentifier for the node whose ID is to be updated
+     * @param updatedStatus the new status for the node
+     * @return the previous status for the node
+     */
     private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus) {
         return updateNodeStatus(nodeId, updatedStatus, true);
     }
 
     private NodeConnectionStatus updateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
+        final String nodeUuid = nodeId.getId();
+
+        while (true) {
+            final NodeConnectionStatus currentStatus = nodeStatuses.get(nodeUuid);
+            if (currentStatus == null) {
+                onNodeAdded(nodeId, storeState);
+
+                // Return null because that was the previous state
+                return null;
+            }
+
+            if (currentStatus.getUpdateIdentifier() > updatedStatus.getUpdateIdentifier()) {
+                logger.debug("Received status update {} but ignoring it because it has an Update ID of {} and the current status has an Update ID of {}",
+                    updatedStatus, updatedStatus.getUpdateIdentifier(), currentStatus.getUpdateIdentifier());
+                return currentStatus;
+            }
+
+            final boolean updated = nodeStatuses.replace(nodeUuid, currentStatus, updatedStatus);
+            if (updated) {
+                onNodeStateChange(nodeId, updatedStatus.getState());
+                logger.info("Status of {} changed from {} to {}", nodeId, currentStatus, updatedStatus);
+                return currentStatus;
+            }
+        }
+    }
+
+    private NodeConnectionStatus forcefullyUpdateNodeStatus(final NodeIdentifier nodeId, final NodeConnectionStatus updatedStatus, final boolean storeState) {
         final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId.getId(), updatedStatus);
         if (evictedStatus == null) {
             onNodeAdded(nodeId, storeState);
@@ -502,12 +538,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     }
 
     @Override
-    public void requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
+    public Future<Void> requestNodeOffload(final NodeIdentifier nodeId, final OffloadCode offloadCode, final String explanation) {
         final Set<NodeIdentifier> offloadNodeIds = getNodeIdentifiers(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED);
         if (offloadNodeIds.contains(nodeId)) {
             logger.debug("Attempted to offload node but the node is already offloading or offloaded");
             // no need to do anything here, the node is currently offloading or already offloaded
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         final Set<NodeIdentifier> disconnectedNodeIds = getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
@@ -524,11 +560,11 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         request.setExplanation(explanation);
 
         addNodeEvent(nodeId, "Offload requested due to " + explanation);
-        offloadAsynchronously(request, 10, 5);
+        return offloadAsynchronously(request, 10, 5);
     }
 
     @Override
-    public void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+    public Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
         final Set<NodeIdentifier> connectedNodeIds = getNodeIdentifiers(NodeConnectionState.CONNECTED);
         if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) {
             throw new IllegalNodeDisconnectionException("Cannot disconnect node " + nodeId + " because it is the only node currently connected");
@@ -541,7 +577,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         // There is no need to tell the node that it's disconnected if it is due to being
         // shutdown, as we will not be able to connect to the node anyway.
         if (disconnectionCode == DisconnectionCode.NODE_SHUTDOWN) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         final DisconnectMessage request = new DisconnectMessage();
@@ -549,7 +585,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         request.setExplanation(explanation);
 
         addNodeEvent(nodeId, "Disconnection requested due to " + explanation);
-        disconnectAsynchronously(request, 10, 5);
+        return disconnectAsynchronously(request, 10, 5);
     }
 
     @Override
@@ -834,12 +870,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
     void updateNodeStatus(final NodeConnectionStatus status, final boolean waitForCoordinator) {
         final NodeIdentifier nodeId = status.getNodeIdentifier();
 
-        // In this case, we are using nodeStatuses.put() instead of getting the current value and
+        // In this case, we are using nodeStatuses.put() (i.e., forcefully updating node status) instead of getting the current value and
         // comparing that to the new value and using the one with the largest update id. This is because
         // this method is called when something occurs that causes this node to change the status of the
         // node in question. We only use comparisons against the current value when we receive an update
         // about a node status from a different node, since those may be received out-of-order.
-        final NodeConnectionStatus currentStatus = updateNodeStatus(nodeId, status);
+        final NodeConnectionStatus currentStatus = forcefullyUpdateNodeStatus(nodeId, status, true);
         final NodeConnectionState currentState = currentStatus == null ? null : currentStatus.getState();
         if (Objects.equals(status, currentStatus)) {
             logger.debug("Received notification of Node Status Change for {} but the status remained the same: {}", nodeId, status);
@@ -901,19 +937,24 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         senderListener.notifyNodeStatusChange(nodesToNotify, message);
     }
 
-    private void offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
+    private Future<Void> offloadAsynchronously(final OffloadMessage request, final int attempts, final int retrySeconds) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
         final Thread offloadThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 final NodeIdentifier nodeId = request.getNodeId();
 
+                Exception lastException = null;
                 for (int i = 0; i < attempts; i++) {
                     try {
                         senderListener.offload(request);
                         reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
+                        future.complete(null);
                         return;
                     } catch (final Exception e) {
                         logger.error("Failed to notify {} that it has been offloaded due to {}", request.getNodeId(), request.getExplanation(), e);
+                        lastException = e;
 
                         try {
                             Thread.sleep(retrySeconds * 1000L);
@@ -923,38 +964,50 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                         }
                     }
                 }
+
+                future.completeExceptionally(lastException);
             }
         }, "Offload " + request.getNodeId());
 
         offloadThread.start();
+        return future;
     }
 
-    private void disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
+    private Future<Void> disconnectAsynchronously(final DisconnectMessage request, final int attempts, final int retrySeconds) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+
         final Thread disconnectThread = new Thread(new Runnable() {
             @Override
             public void run() {
                 final NodeIdentifier nodeId = request.getNodeId();
 
+                Exception lastException = null;
                 for (int i = 0; i < attempts; i++) {
                     try {
                         senderListener.disconnect(request);
                         reportEvent(nodeId, Severity.INFO, "Node disconnected due to " + request.getExplanation());
+                        future.complete(null);
                         return;
                     } catch (final Exception e) {
                         logger.error("Failed to notify {} that it has been disconnected from the cluster due to {}", request.getNodeId(), request.getExplanation());
+                        lastException = e;
 
                         try {
                             Thread.sleep(retrySeconds * 1000L);
                         } catch (final InterruptedException ie) {
+                            future.completeExceptionally(ie);
                             Thread.currentThread().interrupt();
                             return;
                         }
                     }
                 }
+
+                future.completeExceptionally(lastException);
             }
         }, "Disconnect " + request.getNodeId());
 
         disconnectThread.start();
+        return future;
     }
 
     public void validateHeartbeat(final NodeHeartbeat heartbeat) {
@@ -1092,12 +1145,12 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         if (statusChangeMessage.getNodeConnectionStatus().getState() == NodeConnectionState.REMOVED) {
             if (removeNodeConditionally(nodeId, oldStatus)) {
                 storeState();
+                logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
             }
         } else {
             updateNodeStatus(nodeId, updatedStatus);
         }
 
-        logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
         logger.debug("State of cluster nodes is now {}", nodeStatuses);
 
         final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
new file mode 100644
index 0000000..592e51c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.lifecycle;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.DecommissionTask;
+import org.apache.nifi.controller.FlowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterDecommissionTask implements DecommissionTask {
+    private static final Logger logger = LoggerFactory.getLogger(ClusterDecommissionTask.class);
+    private static final int delaySeconds = 3;
+
+    private final ClusterCoordinator clusterCoordinator;
+    private final FlowController flowController;
+    private NodeIdentifier localNodeIdentifier;
+
+    public ClusterDecommissionTask(final ClusterCoordinator clusterCoordinator, final FlowController flowController) {
+        this.clusterCoordinator = clusterCoordinator;
+        this.flowController = flowController;
+    }
+
+    @Override
+    public synchronized void decommission() throws InterruptedException {
+        if (clusterCoordinator == null) {
+            throw new IllegalStateException("Cannot decommission Node because it is not part of a cluster");
+        }
+
+        logger.info("Decommissioning Node...");
+        localNodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
+        if (localNodeIdentifier == null) {
+            throw new IllegalStateException("Node has not yet connected to the cluster");
+        }
+
+        flowController.stopHeartbeating();
+        flowController.setClustered(false, null);
+        logger.info("Instructed FlowController to stop sending heartbeats to Cluster Coordinator and take Cluster Disconnect actions");
+
+        disconnectNode();
+        logger.info("Requested that node be disconnected from cluster");
+
+        waitForDisconnection();
+        logger.info("Successfully disconnected node from cluster");
+
+        offloadNode();
+        logger.info("Successfully triggered Node Offload. Will wait for offload to complete");
+
+        waitForOffloadToFinish();
+        logger.info("Offload has successfully completed.");
+
+        removeFromCluster();
+        logger.info("Requested that node be removed from cluster.");
+
+        waitForRemoval();
+        logger.info("Node successfully removed from cluster. Decommission is complete.");
+    }
+
+    private void disconnectNode() throws InterruptedException {
+        logger.info("Requesting that Node disconnect from cluster");
+
+        while (true) {
+            final Future<Void> future = clusterCoordinator.requestNodeDisconnect(localNodeIdentifier, DisconnectionCode.USER_DISCONNECTED, "Node is being decommissioned");
+            try {
+                future.get();
+                return;
+            } catch (final ExecutionException e) {
+                final Throwable cause = e.getCause();
+                logger.error("Failed when attempting to disconnect node from cluster", cause);
+            }
+        }
+    }
+
+    private void waitForDisconnection() throws InterruptedException {
+        logger.info("Waiting for Node to be completely disconnected from cluster");
+        waitForState(Collections.singleton(NodeConnectionState.DISCONNECTED));
+    }
+
+    private void offloadNode() throws InterruptedException {
+        logger.info("Requesting that Node be offloaded");
+
+        while (true) {
+            final Future<Void> future = clusterCoordinator.requestNodeOffload(localNodeIdentifier, OffloadCode.OFFLOADED, "Node is being decommissioned");
+            try {
+                future.get();
+                break;
+            } catch (final ExecutionException e) {
+                final Throwable cause = e.getCause();
+                logger.error("Failed when attempting to disconnect node from cluster", cause);
+            }
+        }
+
+        // Wait until status changes to either OFFLOADING or OFFLOADED.
+        waitForState(new HashSet<>(Arrays.asList(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED)));
+    }
+
+    private void waitForState(final Set<NodeConnectionState> acceptableStates) throws InterruptedException {
+        while (true) {
+            final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            final NodeConnectionState state = status.getState();
+            logger.debug("Node state is {}", state);
+
+            if (acceptableStates.contains(state)) {
+                return;
+            }
+
+            TimeUnit.SECONDS.sleep(delaySeconds);
+        }
+    }
+
+    private void waitForOffloadToFinish() throws InterruptedException {
+        logger.info("Waiting for Node to finish offloading");
+
+        while (true) {
+            final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            final NodeConnectionState state = status.getState();
+            if (state == NodeConnectionState.OFFLOADED) {
+                return;
+            }
+
+            if (state != NodeConnectionState.OFFLOADING) {
+                throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + state);
+            }
+
+            logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", delaySeconds);
+            TimeUnit.SECONDS.sleep(delaySeconds);
+        }
+    }
+
+    private void removeFromCluster() {
+        clusterCoordinator.removeNode(localNodeIdentifier, "<Local Decommission>");
+    }
+
+    private void waitForRemoval() throws InterruptedException {
+        logger.info("Waiting for Node to be completely removed from cluster");
+
+        while (true) {
+            final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(localNodeIdentifier);
+            if (status == null) {
+                return;
+            }
+
+            final NodeConnectionState state = status.getState();
+            if (state == NodeConnectionState.REMOVED) {
+                return;
+            }
+
+            logger.debug("Node state is {}. Will wait {} seconds and check again", state, delaySeconds);
+            TimeUnit.SECONDS.sleep(delaySeconds);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
index c1a7665..a4c2573 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml
@@ -51,9 +51,15 @@
         <property name="properties" ref="nifiProperties"/>
         <property name="extensionManager" ref="extensionManager" />
     </bean>
-    
+
     <!-- Heartbeat Monitor -->
     <bean id="heartbeatMonitor" class="org.apache.nifi.cluster.spring.HeartbeatMonitorFactoryBean">
         <property name="properties" ref="nifiProperties"/>
     </bean>
+
+    <bean id="decommissionTask" class="org.apache.nifi.cluster.lifecycle.ClusterDecommissionTask">
+        <constructor-arg ref="clusterCoordinator" />
+        <constructor-arg ref="flowController" />
+    </bean>
+
 </beans>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 98aa0b4..5efb9ca 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -20,11 +20,11 @@ package org.apache.nifi.cluster.coordination.heartbeat;
 import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
-import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.coordination.node.NodeWorkload;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.reporting.Severity;
@@ -45,7 +45,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
@@ -119,9 +121,10 @@ public class TestAbstractHeartbeatMonitor {
             }
 
             @Override
-            public synchronized void requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
+            public synchronized Future<Void> requestNodeDisconnect(final NodeIdentifier nodeId, final DisconnectionCode disconnectionCode, final String explanation) {
                 super.requestNodeDisconnect(nodeId, disconnectionCode, explanation);
                 requestedToDisconnect.add(nodeId);
+                return CompletableFuture.completedFuture(null);
             }
         };
 
@@ -253,13 +256,15 @@ public class TestAbstractHeartbeatMonitor {
         }
 
         @Override
-        public synchronized void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
+        public synchronized Future<Void> requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, String explanation) {
             statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED));
+            return CompletableFuture.completedFuture(null);
         }
 
         @Override
-        public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
+        public synchronized Future<Void> requestNodeDisconnect(NodeIdentifier nodeId, DisconnectionCode disconnectionCode, String explanation) {
             statuses.put(nodeId, new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED));
+            return CompletableFuture.completedFuture(null);
         }
 
         @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2cb5e28..5154963 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2223,6 +2223,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
 
             if (heartbeatSenderFuture != null) {
+                LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator");
                 heartbeatSenderFuture.cancel(false);
             }
         } finally {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index 833075f..c2b5143 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -28,6 +28,7 @@ import org.apache.nifi.authorization.exception.AuthorizationAccessException;
 import org.apache.nifi.authorization.exception.AuthorizerCreationException;
 import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.controller.FlowController;
 import org.apache.nifi.controller.StandardFlowService;
 import org.apache.nifi.controller.flow.FlowManager;
@@ -193,6 +194,11 @@ public class HeadlessNiFiServer implements NiFiServer {
         return new ThreadDumpDiagnosticsFactory();
     }
 
+    @Override
+    public DecommissionTask getDecommissionTask() {
+        return null;
+    }
+
     public void stop() {
         try {
             flowService.stop(false);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
index f1d1f59..63fbfec 100755
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/bin/nifi.sh
@@ -430,7 +430,7 @@ case "$1" in
         install "$@"
         ;;
 
-    start|stop|run|status|is_loaded|dump|diagnostics|env|stateless)
+    start|stop|decommission|run|status|is_loaded|dump|diagnostics|env|stateless)
         main "$@"
         ;;
 
@@ -440,6 +440,6 @@ case "$1" in
         run "start"
         ;;
     *)
-        echo "Usage nifi {start|stop|run|restart|status|dump|diagnostics|install|stateless}"
+        echo "Usage nifi {start|stop|decommission|run|restart|status|dump|diagnostics|install|stateless}"
         ;;
 esac
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 9eeebaf..b9b12ec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi;
 
+import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.diagnostics.DiagnosticsDump;
 import org.apache.nifi.util.LimitingInputStream;
 import org.slf4j.Logger;
@@ -203,6 +204,23 @@ public class BootstrapListener {
                                         logger.info("Received DUMP request from Bootstrap");
                                         writeDump(socket.getOutputStream());
                                         break;
+                                    case DECOMMISSION:
+                                        logger.info("Received DECOMMISSION request from Bootstrap");
+
+                                        try {
+                                            decommission();
+                                            sendAnswer(socket.getOutputStream(), "DECOMMISSION");
+                                            nifi.shutdownHook(false);
+                                        } catch (final Exception e) {
+                                            final OutputStream out = socket.getOutputStream();
+
+                                            out.write(("Failed to decommission node: " + e + "; see app-log for additional details").getBytes(StandardCharsets.UTF_8));
+                                            out.flush();
+                                        } finally {
+                                            socket.close();
+                                        }
+
+                                        break;
                                     case DIAGNOSTICS:
                                         logger.info("Received DIAGNOSTICS request from Bootstrap");
                                         final String[] args = request.getArgs();
@@ -250,6 +268,15 @@ public class BootstrapListener {
         diagnosticsDump.writeTo(out);
     }
 
+    private void decommission() throws InterruptedException {
+        final DecommissionTask decommissionTask = nifi.getServer().getDecommissionTask();
+        if (decommissionTask == null) {
+            throw new IllegalArgumentException("This NiFi instance does not support decommissioning");
+        }
+
+        decommissionTask.decommission();
+    }
+
     private void writeDiagnostics(final OutputStream out, final boolean verbose) throws IOException {
         final DiagnosticsDump diagnosticsDump = nifi.getServer().getDiagnosticsFactory().create(verbose);
         diagnosticsDump.writeTo(out);
@@ -304,6 +331,7 @@ public class BootstrapListener {
             SHUTDOWN,
             DUMP,
             DIAGNOSTICS,
+            DECOMMISSION,
             PING,
             IS_LOADED
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
index 3804ea7..efcf271 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-jetty/src/main/java/org/apache/nifi/web/server/JettyServer.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.NiFiServer;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleDetails;
+import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.serialization.FlowSerializationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
@@ -153,6 +154,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
     private NarAutoLoader narAutoLoader;
     private DiagnosticsFactory diagnosticsFactory;
     private SslContextFactory.Server sslContextFactory;
+    private DecommissionTask decommissionTask;
 
     private WebAppContext webApiContext;
     private WebAppContext webDocsContext;
@@ -1172,6 +1174,7 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
                 }
 
                 diagnosticsFactory = webApplicationContext.getBean("diagnosticsFactory", DiagnosticsFactory.class);
+                decommissionTask = webApplicationContext.getBean("decommissionTask", DecommissionTask.class);
             }
 
             // ensure the web document war was loaded and provide the extension mapping
@@ -1245,6 +1248,11 @@ public class JettyServer implements NiFiServer, ExtensionUiLoader {
         return new ThreadDumpDiagnosticsFactory();
     }
 
+    @Override
+    public DecommissionTask getDecommissionTask() {
+        return decommissionTask;
+    }
+
     private void performInjectionForComponentUis(final Collection<WebAppContext> componentUiExtensionWebContexts,
                                                  final NiFiWebConfigurationContext configurationContext, final FilterHolder securityFilter) {
         if (CollectionUtils.isNotEmpty(componentUiExtensionWebContexts)) {
diff --git a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
index 8124876..cc87079 100644
--- a/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
+++ b/nifi-server-api/src/main/java/org/apache/nifi/NiFiServer.java
@@ -17,6 +17,7 @@
 package org.apache.nifi;
 
 import org.apache.nifi.bundle.Bundle;
+import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.diagnostics.DiagnosticsFactory;
 import org.apache.nifi.nar.ExtensionMapping;
 import org.apache.nifi.util.NiFiProperties;
@@ -37,4 +38,6 @@ public interface NiFiServer {
     DiagnosticsFactory getDiagnosticsFactory();
 
     DiagnosticsFactory getThreadDumpFactory();
+
+    DecommissionTask getDecommissionTask();
 }