You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by be...@apache.org on 2023/01/12 15:04:27 UTC

[nifi] branch main updated: NIFI-10895 Update properties command for MiNiFi C2

This is an automated email from the ASF dual-hosted git repository.

bejancsaba pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8807a9d377 NIFI-10895 Update properties command for MiNiFi C2
8807a9d377 is described below

commit 8807a9d377b648eb30cd5823507ac81e58df4fa4
Author: Ferenc Erdei <er...@gmail.com>
AuthorDate: Wed Nov 30 10:12:35 2022 +0100

    NIFI-10895 Update properties command for MiNiFi C2
    
    Signed-off-by: Csaba Bejan <be...@gmail.com>
    
    This closes #6733.
---
 .../org/apache/nifi/c2/client/C2ClientConfig.java  |  24 ++
 .../apache/nifi/c2/client/http/C2HttpClient.java   |   2 +
 .../nifi/c2/client/http/C2HttpClientTest.java      |   7 +-
 .../nifi/c2/client/service/C2ClientService.java    | 102 ++++++++-
 .../service/operation/C2OperationHandler.java      |   8 +
 ...ervice.java => C2OperationHandlerProvider.java} |  24 +-
 .../client/service/operation/OperationQueue.java   |  80 +++++++
 .../service/operation/RequestedOperationDAO.java}  |  39 ++--
 .../UpdateConfigurationOperationHandler.java       |   5 +
 .../UpdatePropertiesOperationHandler.java          |  92 ++++++++
 .../c2/client/service/C2ClientServiceTest.java     | 147 +++++++++++-
 ...st.java => C2OperationHandlerProviderTest.java} |  35 +--
 .../UpdatePropertiesOperationHandlerTest.java      | 130 +++++++++++
 .../apache/nifi/c2/protocol/api/C2Operation.java   |  29 +++
 .../nifi/c2/protocol/api/C2OperationAck.java       |  30 +++
 .../nifi/c2/protocol/api/C2OperationState.java     |  25 +++
 .../apache/nifi/c2/protocol/api/OperandType.java   |   1 +
 .../apache/nifi/c2/protocol/api/OperationType.java |   3 +-
 minifi/minifi-bootstrap/pom.xml                    |   5 +
 .../apache/nifi/minifi/bootstrap/RunMiNiFi.java    |  36 ++-
 .../bootstrap/command/CommandRunnerFactory.java    |  10 +-
 .../nifi/minifi/bootstrap/command/StartRunner.java | 145 ++++++++----
 .../ConfigurationChangeCoordinator.java            |  12 +-
 .../differentiators/Differentiator.java            |   4 +-
 .../differentiators/WholeConfigDifferentiator.java |  10 +-
 .../ingestors/FileChangeIngestor.java              |   2 +-
 .../ingestors/PullHttpChangeIngestor.java          |   2 +-
 .../ingestors/RestChangeIngestor.java              |   2 +-
 .../minifi/bootstrap/service/BootstrapCodec.java   |  90 +++++---
 .../bootstrap/service/BootstrapFileProvider.java   |  19 +-
 .../service/MiNiFiConfigurationChangeListener.java |   2 +-
 .../minifi/bootstrap/service/MiNiFiListener.java   |  15 +-
 .../service/UpdateConfigurationService.java        |  95 ++++++++
 .../bootstrap/service/UpdatePropertiesService.java | 107 +++++++++
 .../minifi/bootstrap/util/ConfigTransformer.java   |   6 +
 .../command/CommandRunnerFactoryTest.java          |  35 +--
 .../WholeConfigDifferentiatorTest.java             |  10 +-
 .../bootstrap/service/BootstrapCodecTest.java      |  87 +++++--
 minifi/minifi-commons/minifi-commons-api/pom.xml   |  32 +++
 .../minifi/commons/api/MiNiFiCommandState.java     |  29 +--
 .../nifi/minifi/commons/api/MiNiFiConstants.java   |  27 +--
 minifi/minifi-commons/pom.xml                      |   1 +
 minifi/minifi-docker/pom.xml                       |   1 +
 .../main/markdown/minifi-java-agent-quick-start.md |  11 +-
 .../c2/hierarchical/minifi-edge1/expected.json     |   2 +-
 .../c2/hierarchical/minifi-edge2/expected.json     |   2 +-
 .../c2/hierarchical/minifi-edge3/expected.json     |   2 +-
 .../minifi-framework/minifi-framework-core/pom.xml |   5 +
 .../org/apache/nifi/minifi/MiNiFiProperties.java   | 151 +++++++++++++
 .../apache/nifi/minifi/c2/C2NiFiProperties.java    |  79 -------
 .../apache/nifi/minifi/c2/C2NifiClientService.java | 250 +++++++++++++++++----
 .../minifi/c2/FileBasedRequestedOperationDAO.java  |  73 ++++++
 .../c2/command/AgentPropertyValidationContext.java | 111 +++++++++
 .../minifi/c2/command/PropertiesPersister.java     | 143 ++++++++++++
 .../nifi/minifi/c2/command/UpdatableProperty.java  |  75 +++++++
 .../command/UpdatePropertiesPropertyProvider.java  |  78 +++++++
 .../c2/FileBasedRequestedOperationDAOTest.java     | 114 ++++++++++
 .../minifi/c2/command/PropertiesPersisterTest.java | 152 +++++++++++++
 .../UpdatePropertiesPropertyProviderTest.java      |  87 +++++++
 .../src/main/resources/conf/bootstrap.conf         |   4 -
 .../apache/nifi/minifi/StandardMiNiFiServer.java   |  20 +-
 61 files changed, 2501 insertions(+), 425 deletions(-)

diff --git a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
index 2d823320c3..360e5287a2 100644
--- a/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
+++ b/c2/c2-client-bundle/c2-client-base/src/main/java/org/apache/nifi/c2/client/C2ClientConfig.java
@@ -40,6 +40,8 @@ public class C2ClientConfig {
     private final long callTimeout;
     private final long readTimeout;
     private final long connectTimeout;
+    private final int maxIdleConnections;
+    private final long keepAliveDuration;
     private final String c2RequestCompression;
     private final String c2AssetDirectory;
 
@@ -63,6 +65,8 @@ public class C2ClientConfig {
         this.truststoreType = builder.truststoreType;
         this.readTimeout = builder.readTimeout;
         this.connectTimeout = builder.connectTimeout;
+        this.maxIdleConnections = builder.maxIdleConnections;
+        this.keepAliveDuration = builder.keepAliveDuration;
         this.c2RequestCompression = builder.c2RequestCompression;
         this.c2AssetDirectory = builder.c2AssetDirectory;
     }
@@ -151,6 +155,14 @@ public class C2ClientConfig {
         return c2AssetDirectory;
     }
 
+    public int getMaxIdleConnections() {
+        return maxIdleConnections;
+    }
+
+    public long getKeepAliveDuration() {
+        return keepAliveDuration;
+    }
+
     /**
      * Builder for client configuration.
      */
@@ -175,6 +187,8 @@ public class C2ClientConfig {
         private String truststoreType;
         private long readTimeout;
         private long connectTimeout;
+        private int maxIdleConnections;
+        private long keepAliveDuration;
         private String c2RequestCompression;
         private String c2AssetDirectory;
 
@@ -273,6 +287,16 @@ public class C2ClientConfig {
             return this;
         }
 
+        public Builder maxIdleConnections(final int maxIdleConnections) {
+            this.maxIdleConnections = maxIdleConnections;
+            return this;
+        }
+
+        public Builder keepAliveDuration(final long keepAliveDuration) {
+            this.keepAliveDuration = keepAliveDuration;
+            return this;
+        }
+
         public Builder c2RequestCompression(final String c2RequestCompression) {
             this.c2RequestCompression = c2RequestCompression;
             return this;
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index 293851c0f5..d881fa5dff 100644
--- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.X509TrustManager;
+import okhttp3.ConnectionPool;
 import okhttp3.MediaType;
 import okhttp3.MultipartBody;
 import okhttp3.OkHttpClient;
@@ -74,6 +75,7 @@ public class C2HttpClient implements C2Client {
 
         // Set whether to follow redirects
         okHttpClientBuilder.followRedirects(true);
+        okHttpClientBuilder.connectionPool(new ConnectionPool(clientConfig.getMaxIdleConnections(), clientConfig.getKeepAliveDuration(), TimeUnit.MILLISECONDS));
 
         // Timeouts
         okHttpClientBuilder.connectTimeout(clientConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);
diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
index aec67b2f0e..28ceef2d1f 100644
--- a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
+++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
@@ -39,7 +39,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -51,6 +50,8 @@ public class C2HttpClientTest {
     private static final String ACK_PATH = "c2/acknowledge";
     private static final int HTTP_STATUS_OK = 200;
     private static final int HTTP_STATUS_BAD_REQUEST = 400;
+    private static final long KEEP_ALIVE_DURATION = 5000l;
+    private static final int MAX_IDLE_CONNECTIONS = 5;
 
     @Mock
     private C2ClientConfig c2ClientConfig;
@@ -58,7 +59,6 @@ public class C2HttpClientTest {
     @Mock
     private C2Serializer serializer;
 
-    @InjectMocks
     private C2HttpClient c2HttpClient;
 
     private MockWebServer mockWebServer;
@@ -69,6 +69,9 @@ public class C2HttpClientTest {
     public void startServer() {
         mockWebServer = new MockWebServer();
         baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
+        when(c2ClientConfig.getKeepAliveDuration()).thenReturn(KEEP_ALIVE_DURATION);
+        when(c2ClientConfig.getMaxIdleConnections()).thenReturn(MAX_IDLE_CONNECTIONS);
+        c2HttpClient = new C2HttpClient(c2ClientConfig, serializer);
     }
 
     @AfterEach
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
index 67d406086e..e8f0718cbd 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
@@ -16,13 +16,24 @@
  */
 package org.apache.nifi.c2.client.service;
 
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
 import org.apache.nifi.c2.client.api.C2Client;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
 import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,23 +43,73 @@ public class C2ClientService {
 
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
-    private final C2OperationService operationService;
+    private final C2OperationHandlerProvider c2OperationHandlerProvider;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final Consumer<C2Operation> c2OperationRegister;
+    private volatile boolean heartbeatLocked = false;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationHandlerProvider c2OperationHandlerProvider,
+        RequestedOperationDAO requestedOperationDAO, Consumer<C2Operation> c2OperationRegister) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
-        this.operationService = operationService;
+        this.c2OperationHandlerProvider = c2OperationHandlerProvider;
+        this.requestedOperationDAO = requestedOperationDAO;
+        this.c2OperationRegister = c2OperationRegister;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
+            if (heartbeatLocked) {
+                logger.debug("Heartbeats are locked, skipping sending for now");
+            } else {
+                try {
+                    C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
+                    client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+                } catch (Exception e) {
+                    logger.error("Failed to send/process heartbeat:", e);
+                }
+            }
+    }
+
+    public void sendAcknowledge(C2OperationAck operationAck) {
         try {
-            C2Heartbeat c2Heartbeat = c2HeartbeatFactory.create(runtimeInfoWrapper);
-            client.publishHeartbeat(c2Heartbeat).ifPresent(this::processResponse);
+            client.acknowledgeOperation(operationAck);
         } catch (Exception e) {
-            logger.error("Failed to send/process heartbeat:", e);
+            logger.error("Failed to send acknowledge:", e);
         }
     }
 
+    public void enableHeartbeat() {
+        heartbeatLocked = false;
+    }
+
+    private void disableHeartbeat() {
+        heartbeatLocked = true;
+    }
+
+    public void handleRequestedOperations(List<C2Operation> requestedOperations) {
+        LinkedList<C2Operation> c2Operations = new LinkedList<>(requestedOperations);
+        C2Operation requestedOperation;
+        while ((requestedOperation = c2Operations.poll()) != null) {
+            Optional<C2OperationHandler> c2OperationHandler = c2OperationHandlerProvider.getHandlerForOperation(requestedOperation);
+            if (!c2OperationHandler.isPresent()) {
+                continue;
+            }
+            C2OperationHandler operationHandler = c2OperationHandler.get();
+            C2OperationAck c2OperationAck = operationHandler.handle(requestedOperation);
+            if (requiresRestart(operationHandler, c2OperationAck)) {
+                if (initiateRestart(c2Operations, requestedOperation)) {
+                    return;
+                }
+                C2OperationState c2OperationState = new C2OperationState();
+                c2OperationState.setState(OperationState.NOT_APPLIED);
+                c2OperationAck.setOperationState(c2OperationState);
+            }
+            sendAcknowledge(c2OperationAck);
+        }
+        enableHeartbeat();
+        requestedOperationDAO.cleanup();
+    }
+
     private void processResponse(C2HeartbeatResponse response) {
         List<C2Operation> requestedOperations = response.getRequestedOperations();
         if (requestedOperations != null && !requestedOperations.isEmpty()) {
@@ -59,11 +120,30 @@ public class C2ClientService {
         }
     }
 
-    private void handleRequestedOperations(List<C2Operation> requestedOperations) {
-        for (C2Operation requestedOperation : requestedOperations) {
-            operationService.handleOperation(requestedOperation)
-                .ifPresent(client::acknowledgeOperation);
+    private boolean requiresRestart(C2OperationHandler c2OperationHandler, C2OperationAck c2OperationAck) {
+        return c2OperationHandler.requiresRestart() && isOperationFullyApplied(c2OperationAck);
+    }
+
+    private boolean isOperationFullyApplied(C2OperationAck c2OperationAck) {
+        return Optional.ofNullable(c2OperationAck)
+            .map(C2OperationAck::getOperationState)
+            .map(C2OperationState::getState)
+            .filter(FULLY_APPLIED::equals)
+            .isPresent();
+    }
+
+    private boolean initiateRestart(LinkedList<C2Operation> requestedOperations, C2Operation requestedOperation) {
+        try {
+            disableHeartbeat();
+            requestedOperationDAO.save(new OperationQueue(requestedOperation, requestedOperations));
+            c2OperationRegister.accept(requestedOperation);
+            return true;
+        } catch (Exception e) {
+            logger.error("Failed to initiate restart. Dropping operation and continue with remaining operations", e);
+            requestedOperationDAO.cleanup();
         }
+        return false;
     }
+
 }
 
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
index 586f0e624a..d8668acaa0 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandler.java
@@ -48,6 +48,14 @@ public interface C2OperationHandler {
      */
     Map<String, Object> getProperties();
 
+    /**
+     * Determines if the given operation requires to restart the MiNiFi process
+     * @return true if it requires restart, false otherwise
+     */
+    default boolean requiresRestart() {
+        return false;
+    }
+
     /**
      * Handler logic for the specific C2Operation
      *
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
similarity index 72%
rename from c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
rename to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
index 31e483929d..e56d55d865 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProvider.java
@@ -22,32 +22,23 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.nifi.c2.protocol.api.C2Operation;
-import org.apache.nifi.c2.protocol.api.C2OperationAck;
 import org.apache.nifi.c2.protocol.api.OperandType;
 import org.apache.nifi.c2.protocol.api.OperationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class C2OperationService {
+public class C2OperationHandlerProvider {
 
-    private static final Logger logger = LoggerFactory.getLogger(C2OperationService.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(C2OperationHandlerProvider.class);
 
     private final Map<OperationType, Map<OperandType, C2OperationHandler>> handlerMap = new HashMap<>();
 
-    public C2OperationService(List<C2OperationHandler> handlers) {
+    public C2OperationHandlerProvider(List<C2OperationHandler> handlers) {
         for (C2OperationHandler handler : handlers) {
             handlerMap.computeIfAbsent(handler.getOperationType(), x -> new HashMap<>()).put(handler.getOperandType(), handler);
         }
     }
 
-    public Optional<C2OperationAck> handleOperation(C2Operation operation) {
-        return getHandlerForOperation(operation)
-            .map(handler -> {
-                logger.info("Handling {} {} operation", operation.getOperation(), operation.getOperand());
-                return handler.handle(operation);
-            });
-    }
-
     public Map<OperationType, Map<OperandType, C2OperationHandler>> getHandlers() {
         Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = new HashMap<>();
         handlerMap.entrySet()
@@ -61,8 +52,11 @@ public class C2OperationService {
         return Collections.unmodifiableMap(handlers);
     }
 
-    private Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) {
-        return Optional.ofNullable(handlerMap.get(operation.getOperation()))
-            .map(operandMap -> operandMap.get(operation.getOperand()));
+    public Optional<C2OperationHandler> getHandlerForOperation(C2Operation operation) {
+        Optional<C2OperationHandler> handler = Optional.ofNullable(handlerMap.get(operation.getOperation())).map(operandMap -> operandMap.get(operation.getOperand()));
+        if (!handler.isPresent()) {
+            LOGGER.warn("No handler found for {} {} operation", operation.getOperation(), operation.getOperand());
+        }
+        return handler;
     }
 }
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
new file mode 100644
index 0000000000..ae0c56c1a8
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/OperationQueue.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+
+public class OperationQueue implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private C2Operation currentOperation;
+    private List<C2Operation> remainingOperations;
+
+    public OperationQueue() {
+    }
+
+    public OperationQueue(C2Operation currentOperation, List<C2Operation> remainingOperations) {
+        this.currentOperation = currentOperation;
+        this.remainingOperations = remainingOperations == null ? Collections.emptyList() : remainingOperations;
+    }
+
+    public C2Operation getCurrentOperation() {
+        return currentOperation;
+    }
+
+    public List<C2Operation> getRemainingOperations() {
+        return remainingOperations;
+    }
+
+    public void setCurrentOperation(C2Operation currentOperation) {
+        this.currentOperation = currentOperation;
+    }
+
+    public void setRemainingOperations(List<C2Operation> remainingOperations) {
+        this.remainingOperations = remainingOperations;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        OperationQueue that = (OperationQueue) o;
+        return Objects.equals(currentOperation, that.currentOperation) && Objects.equals(remainingOperations, that.remainingOperations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(currentOperation, remainingOperations);
+    }
+
+    @Override
+    public String toString() {
+        return "OperationQueue{" +
+            "currentOperation=" + currentOperation +
+            ", remainingOperations=" + remainingOperations +
+            '}';
+    }
+}
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
similarity index 57%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
index 1f1cb19035..1216aa812d 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/RequestedOperationDAO.java
@@ -15,28 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.c2.client.service.operation;
 
-import java.util.Arrays;
 import java.util.Optional;
 
-public enum OperandType {
+/**
+ * The purpose of this interface is to be able to persist operations between restarts.
+ */
+public interface RequestedOperationDAO {
+
+    /**
+     * Persist the given requested operation list
+     * @param operationQueue the queue containing the current and remaining operations
+     */
+    void save(OperationQueue operationQueue);
 
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    ASSET;
+    /**
+     * Returns the saved Operations
+     *
+     * @return the C2 Operations queue with the actual operation
+     */
+    Optional<OperationQueue> load();
 
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
+    /**
+     * Resets the saved operations
+     */
+    void cleanup();
 
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
 }
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
index f04af88714..db3ef8a86d 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
@@ -136,4 +136,9 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
     public Map<String, Object> getProperties() {
         return operandPropertiesProvider.getProperties();
     }
+
+    @Override
+    public boolean requiresRestart() {
+        return true;
+    }
 }
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java
new file mode 100644
index 0000000000..2a9ac64cc2
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandler.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.FULLY_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NOT_APPLIED;
+import static org.apache.nifi.c2.protocol.api.C2OperationState.OperationState.NO_OPERATION;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesOperationHandler implements C2OperationHandler {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesOperationHandler.class);
+
+    private final OperandPropertiesProvider operandPropertiesProvider;
+    private final Function<Map<String, String>, Boolean> persistProperties;
+
+    public UpdatePropertiesOperationHandler(OperandPropertiesProvider operandPropertiesProvider, Function<Map<String, String>, Boolean> persistProperties) {
+        this.operandPropertiesProvider = operandPropertiesProvider;
+        this.persistProperties = persistProperties;
+    }
+
+    @Override
+    public OperationType getOperationType() {
+        return UPDATE;
+    }
+
+    @Override
+    public OperandType getOperandType() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        return operandPropertiesProvider.getProperties();
+    }
+
+    @Override
+    public C2OperationAck handle(C2Operation operation) {
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        c2OperationAck.setOperationId(operation.getIdentifier());
+        C2OperationState operationState = new C2OperationState();
+        c2OperationAck.setOperationState(operationState);
+        try {
+            if (persistProperties.apply(operation.getArgs())) {
+                operationState.setState(FULLY_APPLIED);
+            } else {
+                LOGGER.info("Properties are already in desired state");
+                operationState.setState(NO_OPERATION);
+            }
+        } catch (IllegalArgumentException e) {
+            LOGGER.error(e.getMessage());
+            operationState.setState(NOT_APPLIED);
+            operationState.setDetails(e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Exception happened during persisting properties", e);
+            operationState.setState(NOT_APPLIED);
+            operationState.setDetails("Failed to persist properties");
+        }
+        return c2OperationAck;
+    }
+
+    @Override
+    public boolean requiresRestart() {
+        return true;
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
index 37bd572dd1..6de1f86015 100644
--- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
@@ -17,22 +17,33 @@
 package org.apache.nifi.c2.client.service;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.nifi.c2.client.api.C2Client;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandler;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
 import org.apache.nifi.c2.protocol.api.C2Operation;
 import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.InjectMocks;
@@ -49,11 +60,17 @@ public class C2ClientServiceTest {
     private C2HeartbeatFactory c2HeartbeatFactory;
 
     @Mock
-    private C2OperationService operationService;
+    private C2OperationHandlerProvider operationService;
 
     @Mock
     private RuntimeInfoWrapper runtimeInfoWrapper;
 
+    @Mock
+    private RequestedOperationDAO requestedOperationDAO;
+
+    @Mock
+    private Consumer<C2Operation> c2OperationRegister;
+
     @InjectMocks
     private C2ClientService c2ClientService;
 
@@ -62,15 +79,18 @@ public class C2ClientServiceTest {
         C2Heartbeat heartbeat = mock(C2Heartbeat.class);
         when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
         C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
-        hbResponse.setRequestedOperations(generateOperation(1));
+        final List<C2Operation> c2Operations = generateOperation(1);
+        hbResponse.setRequestedOperations(c2Operations);
         when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+        when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
+        when(c2OperationHandler.handle(c2Operations.get(0))).thenReturn(new C2OperationAck());
 
         c2ClientService.sendHeartbeat(runtimeInfoWrapper);
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService).handleOperation(any());
+        verify(c2OperationHandler).handle(any());
         verify(client).acknowledgeOperation(any());
     }
 
@@ -81,14 +101,16 @@ public class C2ClientServiceTest {
         when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
         C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
         hbResponse.setRequestedOperations(generateOperation(operationNum));
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
         when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+        when(operationService.getHandlerForOperation(any())).thenReturn(Optional.of(c2OperationHandler));
+        when(c2OperationHandler.handle(any())).thenReturn(new C2OperationAck());
 
         c2ClientService.sendHeartbeat(runtimeInfoWrapper);
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService, times(operationNum)).handleOperation(any());
+        verify(c2OperationHandler, times(operationNum)).handle(any());
         verify(client, times(operationNum)).acknowledgeOperation(any());
     }
 
@@ -102,7 +124,6 @@ public class C2ClientServiceTest {
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService, times(0)).handleOperation(any());
         verify(client, times(0)).acknowledgeOperation(any());
     }
 
@@ -117,7 +138,6 @@ public class C2ClientServiceTest {
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService, times(0)).handleOperation(any());
         verify(client, times(0)).acknowledgeOperation(any());
     }
 
@@ -128,16 +148,121 @@ public class C2ClientServiceTest {
         C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
         hbResponse.setRequestedOperations(generateOperation(1));
         when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
-        when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+        when(operationService.getHandlerForOperation(any())).thenReturn(Optional.empty());
 
         c2ClientService.sendHeartbeat(runtimeInfoWrapper);
 
         verify(c2HeartbeatFactory).create(any());
         verify(client).publishHeartbeat(heartbeat);
-        verify(operationService).handleOperation(any());
         verify(client, times(0)).acknowledgeOperation(any());
     }
 
+    @Test
+    void shouldHeartbeatSendingNotPropagateExceptions() {
+        when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenThrow(new RuntimeException());
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+    }
+
+    @Test
+    void shouldAckSendingNotPropagateExceptions() {
+        C2OperationAck operationAck = mock(C2OperationAck.class);
+        doThrow(new RuntimeException()).when(client).acknowledgeOperation(operationAck);
+
+        c2ClientService.sendAcknowledge(operationAck);
+    }
+
+    @Test
+    void shouldSendAcknowledgeWithoutPersistingOperationsWhenOperationRequiresRestartButHandlerReturnsNonFullyAppliedState() {
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+        C2OperationAck operationAck = new C2OperationAck();
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(OperationState.NOT_APPLIED);
+        operationAck.setOperationState(c2OperationState);
+        when(c2OperationHandler.requiresRestart()).thenReturn(true);
+        when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+        when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(operationAck);
+
+        c2ClientService.handleRequestedOperations(generateOperation(1));
+
+        verify(operationService).getHandlerForOperation(any(C2Operation.class));
+        verify(c2OperationHandler).handle(any(C2Operation.class));
+        verify(requestedOperationDAO).cleanup();
+        verify(client).acknowledgeOperation(operationAck);
+        verifyNoMoreInteractions(operationService, client, requestedOperationDAO);
+        verifyNoInteractions(c2HeartbeatFactory, c2OperationRegister);
+    }
+
+    @Test
+    void shouldSaveOperationQueueIfRestartIsNeededAndThereAreMultipleRequestedOperations() {
+        C2Operation c2Operation1 = new C2Operation();
+        c2Operation1.setIdentifier("1");
+        C2Operation c2Operation2 = new C2Operation();
+        c2Operation2.setIdentifier("2");
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+        when(c2OperationHandler.requiresRestart()).thenReturn(true);
+        when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(OperationState.FULLY_APPLIED);
+        c2OperationAck.setOperationState(c2OperationState);
+        when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
+
+        c2ClientService.handleRequestedOperations(Arrays.asList(c2Operation1, c2Operation2));
+
+        verify(requestedOperationDAO).save(new OperationQueue(c2Operation1, Collections.singletonList(c2Operation2)));
+        verify(c2OperationRegister).accept(c2Operation1);
+        verifyNoInteractions(client, c2HeartbeatFactory);
+        verifyNoMoreInteractions(requestedOperationDAO, c2OperationRegister, operationService);
+    }
+
+    @Test
+    void shouldReEnableHeartbeatsIfExceptionHappensDuringRegisteringOperationAndThereIsNoMoreOperationInQueue() {
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+        C2Operation operation = new C2Operation();
+        when(c2OperationHandler.requiresRestart()).thenReturn(true);
+        when(operationService.getHandlerForOperation(any(C2Operation.class))).thenReturn(Optional.of(c2OperationHandler));
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(OperationState.FULLY_APPLIED);
+        c2OperationAck.setOperationState(c2OperationState);
+        when(c2OperationHandler.handle(any(C2Operation.class))).thenReturn(c2OperationAck);
+        doThrow(new RuntimeException()).when(c2OperationRegister).accept(any(C2Operation.class));
+        c2ClientService.handleRequestedOperations(Collections.singletonList(operation));
+        when(c2HeartbeatFactory.create(runtimeInfoWrapper)).thenReturn(new C2Heartbeat());
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(runtimeInfoWrapper);
+        verify(client).publishHeartbeat(any(C2Heartbeat.class));
+    }
+
+    @Test
+    void shouldContinueWithRemainingOperationsIfExceptionHappensDuringRegisteringOperationAndThereAreMoreOperationsInQueue() {
+        C2OperationHandler c2OperationHandlerForRestart = mock(C2OperationHandler.class);
+        C2OperationHandler c2OperationHandler = mock(C2OperationHandler.class);
+        C2Operation operation1 = new C2Operation();
+        operation1.setIdentifier("1");
+        C2Operation operation2 = new C2Operation();
+        operation2.setIdentifier("2");
+        C2OperationAck c2OperationAck = new C2OperationAck();
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(OperationState.FULLY_APPLIED);
+        c2OperationAck.setOperationState(c2OperationState);
+        when(c2OperationHandler.requiresRestart()).thenReturn(false);
+        when(c2OperationHandlerForRestart.requiresRestart()).thenReturn(true);
+        when(operationService.getHandlerForOperation(operation1)).thenReturn(Optional.of(c2OperationHandlerForRestart));
+        when(operationService.getHandlerForOperation(operation2)).thenReturn(Optional.of(c2OperationHandler));
+        when(c2OperationHandlerForRestart.handle(operation1)).thenReturn(c2OperationAck);
+        when(c2OperationHandler.handle(operation2)).thenReturn(c2OperationAck);
+
+        doThrow(new RuntimeException()).when(c2OperationRegister).accept(operation1);
+
+        c2ClientService.handleRequestedOperations(Arrays.asList(operation1, operation2));
+
+        verify(client, times(2)).acknowledgeOperation(c2OperationAck);
+    }
+
     private List<C2Operation> generateOperation(int num) {
         return IntStream.range(0, num)
             .mapToObj(x -> new C2Operation())
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
similarity index 73%
rename from c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
rename to c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
index d821dd0738..bc6649ec25 100644
--- a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationHandlerProviderTest.java
@@ -34,7 +34,7 @@ import org.apache.nifi.c2.protocol.api.OperationType;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
-public class C2OperationServiceTest {
+public class C2OperationHandlerProviderTest {
 
     private static C2OperationAck operationAck;
 
@@ -45,45 +45,46 @@ public class C2OperationServiceTest {
     }
 
     @Test
-    void testHandleOperationReturnsEmptyForUnrecognisedOperationType() {
-        C2OperationService service = new C2OperationService(Collections.emptyList());
+    void testGetHandlerForReturnsEmptyForUnrecognisedOperationType() {
+        C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.emptyList());
 
         C2Operation operation = new C2Operation();
-        operation.setOperation(OperationType.UPDATE);
-        operation.setOperand(CONFIGURATION);
-        Optional<C2OperationAck> ack = service.handleOperation(operation);
+        operation.setOperation(DESCRIBE);
+        operation.setOperand(MANIFEST);
+        Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
 
-        assertFalse(ack.isPresent());
+        assertFalse(handler.isPresent());
     }
 
     @Test
-    void testHandleOperation() {
-        C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
+    void testGetHandlerForOperationReturnsEmptyForOperandMismatch() {
+        C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.singletonList(new TestInvalidOperationHandler()));
 
         C2Operation operation = new C2Operation();
         operation.setOperation(DESCRIBE);
         operation.setOperand(MANIFEST);
-        Optional<C2OperationAck> ack = service.handleOperation(operation);
+        Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
 
-        assertTrue(ack.isPresent());
-        assertEquals(operationAck, ack.get());
+        assertFalse(handler.isPresent());
     }
 
     @Test
-    void testHandleOperationReturnsEmptyForOperandMismatch() {
-        C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
+    void testHandleOperation() {
+        TestDescribeOperationHandler describeOperationHandler = new TestDescribeOperationHandler();
+        C2OperationHandlerProvider service = new C2OperationHandlerProvider(Collections.singletonList(describeOperationHandler));
 
         C2Operation operation = new C2Operation();
         operation.setOperation(DESCRIBE);
         operation.setOperand(MANIFEST);
-        Optional<C2OperationAck> ack = service.handleOperation(operation);
+        Optional<C2OperationHandler> handler = service.getHandlerForOperation(operation);
 
-        assertFalse(ack.isPresent());
+        assertTrue(handler.isPresent());
+        assertEquals(describeOperationHandler, handler.get());
     }
 
     @Test
     void testHandlersAreReturned() {
-        C2OperationService service = new C2OperationService(Arrays.asList(new TestDescribeOperationHandler(), new TestInvalidOperationHandler()));
+        C2OperationHandlerProvider service = new C2OperationHandlerProvider(Arrays.asList(new TestDescribeOperationHandler(), new TestInvalidOperationHandler()));
 
         Map<OperationType, Map<OperandType, C2OperationHandler>> handlers = service.getHandlers();
 
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java
new file mode 100644
index 0000000000..b060143dc4
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdatePropertiesOperationHandlerTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
+import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdatePropertiesOperationHandlerTest {
+
+    private static final String ID = "id";
+    private static final Map<String, String> ARGS = Collections.singletonMap("key", "value");
+
+    @Mock
+    private OperandPropertiesProvider operandPropertiesProvider;
+
+    @Mock
+    private Function<Map<String, String>, Boolean> persistProperties;
+
+    @InjectMocks
+    private UpdatePropertiesOperationHandler updatePropertiesOperationHandler;
+
+    @Test
+    void shouldReturnStaticSettings() {
+        assertEquals(UPDATE, updatePropertiesOperationHandler.getOperationType());
+        assertEquals(PROPERTIES, updatePropertiesOperationHandler.getOperandType());
+        assertTrue(updatePropertiesOperationHandler.requiresRestart());
+    }
+
+    @Test
+    void shouldReturnProperties() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("test", new Object());
+        when(operandPropertiesProvider.getProperties()).thenReturn(properties);
+
+        Map<String, Object> result = updatePropertiesOperationHandler.getProperties();
+
+        assertEquals(properties, result);
+    }
+
+    @Test
+    void shouldReturnAckWithFullyAppliedWhenPersistIsSuccessful() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenReturn(true);
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.FULLY_APPLIED), result);
+    }
+
+    @Test
+    void shouldReturnAckWithNoOperationWhenPersistReturnFalse() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenReturn(false);
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.NO_OPERATION), result);
+    }
+
+    @Test
+    void shouldReturnNotAppliedInCaseOfIllegalArgumentException() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenThrow(new IllegalArgumentException());
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        assertEquals(getExpected(OperationState.NOT_APPLIED), result);
+    }
+
+    @Test
+    void shouldReturnNotAppliedInCaseOfException() {
+        C2Operation c2Operation = getC2Operation();
+        when(persistProperties.apply(ARGS)).thenThrow(new RuntimeException());
+
+        C2OperationAck result = updatePropertiesOperationHandler.handle(c2Operation);
+
+        C2OperationAck expected = getExpected(OperationState.NOT_APPLIED);
+        expected.getOperationState().setDetails("Failed to persist properties");
+        assertEquals(expected, result);
+    }
+
+    private C2OperationAck getExpected(OperationState operationState) {
+        final C2OperationAck c2OperationAck = new C2OperationAck();
+        c2OperationAck.setOperationId(ID);
+        C2OperationState c2OperationState = new C2OperationState();
+        c2OperationState.setState(operationState);
+        c2OperationAck.setOperationState(c2OperationState);
+        return c2OperationAck;
+    }
+
+    private C2Operation getC2Operation() {
+        C2Operation c2Operation = new C2Operation();
+        c2Operation.setArgs(ARGS);
+        c2Operation.setIdentifier(ID);
+        return c2Operation;
+    }
+}
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
index 8828d77fbc..5bd7de1d47 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2Operation.java
@@ -23,6 +23,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 @ApiModel
@@ -104,4 +105,32 @@ public class C2Operation implements Serializable {
         this.dependencies = dependencies;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        C2Operation operation1 = (C2Operation) o;
+        return Objects.equals(identifier, operation1.identifier) && operation == operation1.operation && operand == operation1.operand && Objects.equals(args,
+            operation1.args) && Objects.equals(dependencies, operation1.dependencies);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(identifier, operation, operand, args, dependencies);
+    }
+
+    @Override
+    public String toString() {
+        return "C2Operation{" +
+            "identifier='" + identifier + '\'' +
+            ", operation=" + operation +
+            ", operand=" + operand +
+            ", args=" + args +
+            ", dependencies=" + dependencies +
+            '}';
+    }
 }
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
index a12d62e5b7..d74967e34f 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationAck.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
+import java.util.Objects;
 
 @ApiModel
 public class C2OperationAck implements Serializable {
@@ -81,4 +82,33 @@ public class C2OperationAck implements Serializable {
     public void setFlowInfo(final FlowInfo flowInfo) {
         this.flowInfo = flowInfo;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        C2OperationAck that = (C2OperationAck) o;
+        return Objects.equals(operationId, that.operationId) && Objects.equals(operationState, that.operationState) && Objects.equals(deviceInfo,
+            that.deviceInfo) && Objects.equals(agentInfo, that.agentInfo) && Objects.equals(flowInfo, that.flowInfo);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(operationId, operationState, deviceInfo, agentInfo, flowInfo);
+    }
+
+    @Override
+    public String toString() {
+        return "C2OperationAck{" +
+            "operationId='" + operationId + '\'' +
+            ", operationState=" + operationState +
+            ", deviceInfo=" + deviceInfo +
+            ", agentInfo=" + agentInfo +
+            ", flowInfo=" + flowInfo +
+            '}';
+    }
 }
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
index 21a44b582c..dc4e303e43 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/C2OperationState.java
@@ -20,6 +20,7 @@ package org.apache.nifi.c2.protocol.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * Simple model of operations. The current approach is to capture a shared state ( via agent(s)
@@ -69,6 +70,30 @@ public class C2OperationState implements Serializable {
         this.state = OperationState.fromOrdinal(state);
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        C2OperationState that = (C2OperationState) o;
+        return state == that.state && Objects.equals(details, that.details);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(state, details);
+    }
+
+    @Override
+    public String toString() {
+        return "C2OperationState{" +
+            "state=" + state +
+            ", details='" + details + '\'' +
+            '}';
+    }
 
     public enum OperationState {
         FULLY_APPLIED,
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
index 1f1cb19035..bc9538d9c3 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
@@ -27,6 +27,7 @@ public enum OperandType {
     DEBUG,
     MANIFEST,
     REPOSITORY,
+    PROPERTIES,
     ASSET;
 
     public static Optional<OperandType> fromString(String value) {
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
index 9ee3630aef..766bc6d303 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
+++ b/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperationType.java
@@ -22,6 +22,7 @@ import static org.apache.nifi.c2.protocol.api.OperandType.CONFIGURATION;
 import static org.apache.nifi.c2.protocol.api.OperandType.CONNECTION;
 import static org.apache.nifi.c2.protocol.api.OperandType.DEBUG;
 import static org.apache.nifi.c2.protocol.api.OperandType.MANIFEST;
+import static org.apache.nifi.c2.protocol.api.OperandType.PROPERTIES;
 
 import java.util.Arrays;
 import java.util.Set;
@@ -36,7 +37,7 @@ public enum OperationType {
     // C2 Server -> C2 Client Commands
     CLEAR(CONNECTION),
     DESCRIBE(MANIFEST),
-    UPDATE(CONFIGURATION, ASSET),
+    UPDATE(CONFIGURATION, ASSET, PROPERTIES),
     RESTART,
     START,
     STOP,
diff --git a/minifi/minifi-bootstrap/pom.xml b/minifi/minifi-bootstrap/pom.xml
index f77d58d9cc..5ad82482df 100644
--- a/minifi/minifi-bootstrap/pom.xml
+++ b/minifi/minifi-bootstrap/pom.xml
@@ -87,6 +87,11 @@ limitations under the License.
             <artifactId>minifi-commons-schema</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-commons-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
index 6958bb2498..1c141547a0 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java
@@ -44,6 +44,7 @@ import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
 import org.apache.nifi.minifi.bootstrap.service.ReloadService;
 import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
 import org.apache.nifi.minifi.bootstrap.util.UnixProcessUtils;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,6 +75,7 @@ public class RunMiNiFi implements ConfigurationFileHolder {
     public static final int UNINITIALIZED = -1;
     private static final String STATUS_FILE_PORT_KEY = "port";
     private static final String STATUS_FILE_SECRET_KEY = "secret.key";
+    private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
 
     private final BootstrapFileProvider bootstrapFileProvider;
     private final ConfigurationChangeCoordinator configurationChangeCoordinator;
@@ -87,10 +89,14 @@ public class RunMiNiFi implements ConfigurationFileHolder {
     private final Lock startedLock = new ReentrantLock();
     // Is set to true after the MiNiFi instance shuts down in preparation to be reloaded. Will be set to false after MiNiFi is successfully started again.
     private final AtomicBoolean reloading = new AtomicBoolean(false);
+    private final AtomicBoolean commandInProgress = new AtomicBoolean(false);
+    private final MiNiFiCommandSender miNiFiCommandSender;
+    private final CurrentPortProvider currentPortProvider;
+    private final ObjectMapper objectMapper;
 
     public RunMiNiFi(File bootstrapConfigFile) throws IOException {
         bootstrapFileProvider = new BootstrapFileProvider(bootstrapConfigFile);
-
+        objectMapper = getObjectMapper();
         Properties properties = bootstrapFileProvider.getStatusProperties();
 
         miNiFiParameters = new MiNiFiParameters(
@@ -100,20 +106,19 @@ public class RunMiNiFi implements ConfigurationFileHolder {
         );
         ProcessUtils processUtils = new UnixProcessUtils();
 
-        MiNiFiCommandSender miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, getObjectMapper());
+        miNiFiCommandSender = new MiNiFiCommandSender(miNiFiParameters, objectMapper);
         MiNiFiStatusProvider miNiFiStatusProvider = new MiNiFiStatusProvider(miNiFiCommandSender, processUtils);
         periodicStatusReporterManager =
             new PeriodicStatusReporterManager(bootstrapFileProvider.getBootstrapProperties(), miNiFiStatusProvider, miNiFiCommandSender, miNiFiParameters);
-        configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider.getBootstrapProperties(), this,
-            singleton(new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider)));
-
+        MiNiFiConfigurationChangeListener configurationChangeListener = new MiNiFiConfigurationChangeListener(this, DEFAULT_LOGGER, bootstrapFileProvider);
+        configurationChangeCoordinator = new ConfigurationChangeCoordinator(bootstrapFileProvider, this, singleton(configurationChangeListener));
 
-        CurrentPortProvider currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
+        currentPortProvider = new CurrentPortProvider(miNiFiCommandSender, miNiFiParameters, processUtils);
         GracefulShutdownParameterProvider gracefulShutdownParameterProvider = new GracefulShutdownParameterProvider(bootstrapFileProvider);
         reloadService = new ReloadService(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, this, processUtils);
         commandRunnerFactory = new CommandRunnerFactory(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager,
             bootstrapFileProvider, new MiNiFiStdLogHandler(), bootstrapConfigFile, this, gracefulShutdownParameterProvider,
-            new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils);
+            new MiNiFiExecCommandProvider(bootstrapFileProvider), processUtils, configurationChangeListener);
     }
 
     public int run(BootstrapCommand command, String... args) {
@@ -212,6 +217,18 @@ public class RunMiNiFi implements ConfigurationFileHolder {
         configurationChangeCoordinator.close();
     }
 
+    public void sendAcknowledgeToMiNiFi(MiNiFiCommandState commandState) {
+        try {
+            if (commandInProgress.getAndSet(false)) {
+                Integer currentPort = currentPortProvider.getCurrentPort();
+                CMD_LOGGER.info("Sending acknowledge with state {} to MiNiFi on port {}", commandState, currentPort);
+                miNiFiCommandSender.sendCommand(ACKNOWLEDGE_OPERATION, currentPort, commandState.name());
+            }
+        } catch (Exception e) {
+            CMD_LOGGER.error("Failed to send Acknowledge to MiNiFi", e);
+        }
+    }
+
     public PeriodicStatusReporterManager getPeriodicStatusReporterManager() {
         return periodicStatusReporterManager;
     }
@@ -245,7 +262,10 @@ public class RunMiNiFi implements ConfigurationFileHolder {
         ObjectMapper objectMapper = new ObjectMapper();
         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
         objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-        objectMapper.enable(DeserializationFeature.READ_ENUMS_USING_TO_STRING);
         return objectMapper;
     }
+
+    public void setCommandInProgress(boolean value) {
+        commandInProgress.set(value);
+    }
 }
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
index 33902e36f8..730995f76f 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactory.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.nifi.minifi.bootstrap.BootstrapCommand;
 import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
 import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
 import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
@@ -47,11 +48,13 @@ public class CommandRunnerFactory {
     private final GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
     private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
     private final ProcessUtils processUtils;
+    private final ConfigurationChangeListener configurationChangeListener;
 
     public CommandRunnerFactory(MiNiFiCommandSender miNiFiCommandSender, CurrentPortProvider currentPortProvider, MiNiFiParameters miNiFiParameters,
         MiNiFiStatusProvider miNiFiStatusProvider, PeriodicStatusReporterManager periodicStatusReporterManager,
         BootstrapFileProvider bootstrapFileProvider, MiNiFiStdLogHandler miNiFiStdLogHandler, File bootstrapConfigFile, RunMiNiFi runMiNiFi,
-        GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
+        GracefulShutdownParameterProvider gracefulShutdownParameterProvider, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils,
+        ConfigurationChangeListener configurationChangeListener) {
         this.miNiFiCommandSender = miNiFiCommandSender;
         this.currentPortProvider = currentPortProvider;
         this.miNiFiParameters = miNiFiParameters;
@@ -64,6 +67,7 @@ public class CommandRunnerFactory {
         this.gracefulShutdownParameterProvider = gracefulShutdownParameterProvider;
         this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
         this.processUtils = processUtils;
+        this.configurationChangeListener = configurationChangeListener;
     }
 
     /**
@@ -77,7 +81,7 @@ public class CommandRunnerFactory {
             case START:
             case RUN:
                 commandRunner = new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
-                    bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils);
+                    bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, configurationChangeListener);
                 break;
             case STOP:
                 commandRunner = new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils);
@@ -107,7 +111,7 @@ public class CommandRunnerFactory {
         List<CommandRunner> compositeList = new LinkedList<>();
         compositeList.add(new StopRunner(bootstrapFileProvider, miNiFiParameters, miNiFiCommandSender, currentPortProvider, gracefulShutdownParameterProvider, processUtils));
         compositeList.add(new StartRunner(currentPortProvider, bootstrapFileProvider, periodicStatusReporterManager, miNiFiStdLogHandler, miNiFiParameters,
-            bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, processUtils));
+            bootstrapConfigFile, runMiNiFi, miNiFiExecCommandProvider, configurationChangeListener));
         return compositeList;
     }
 }
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
index 9b29360d54..a150fa0c48 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/command/StartRunner.java
@@ -18,13 +18,17 @@
 package org.apache.nifi.minifi.bootstrap.command;
 
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.FULLY_APPLIED;
+import static org.apache.nifi.minifi.commons.api.MiNiFiCommandState.NOT_APPLIED_WITH_RESTART;
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CMD_LOGGER;
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.DEFAULT_LOGGER;
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.MINIFI_CONFIG_FILE_KEY;
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.STATUS_FILE_PID_KEY;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.UNINITIALIZED;
 import static org.apache.nifi.minifi.bootstrap.Status.ERROR;
 import static org.apache.nifi.minifi.bootstrap.Status.OK;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
 import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
 
 import java.io.File;
@@ -50,6 +54,7 @@ import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import org.apache.nifi.minifi.bootstrap.ShutdownHook;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.exception.StartupFailureException;
 import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
 import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
@@ -57,8 +62,6 @@ import org.apache.nifi.minifi.bootstrap.service.MiNiFiExecCommandProvider;
 import org.apache.nifi.minifi.bootstrap.service.MiNiFiListener;
 import org.apache.nifi.minifi.bootstrap.service.MiNiFiStdLogHandler;
 import org.apache.nifi.minifi.bootstrap.service.PeriodicStatusReporterManager;
-import org.apache.nifi.minifi.bootstrap.util.ProcessUtils;
-import org.apache.nifi.util.Tuple;
 
 public class StartRunner implements CommandRunner {
     private static final int STARTUP_WAIT_SECONDS = 60;
@@ -76,11 +79,13 @@ public class StartRunner implements CommandRunner {
     private final RunMiNiFi runMiNiFi;
     private volatile ShutdownHook shutdownHook;
     private final MiNiFiExecCommandProvider miNiFiExecCommandProvider;
-    private final ProcessUtils processUtils;
+    private final ConfigurationChangeListener configurationChangeListener;
+
+    private int listenPort;
 
     public StartRunner(CurrentPortProvider currentPortProvider, BootstrapFileProvider bootstrapFileProvider,
         PeriodicStatusReporterManager periodicStatusReporterManager, MiNiFiStdLogHandler miNiFiStdLogHandler, MiNiFiParameters miNiFiParameters, File bootstrapConfigFile,
-        RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ProcessUtils processUtils) {
+        RunMiNiFi runMiNiFi, MiNiFiExecCommandProvider miNiFiExecCommandProvider, ConfigurationChangeListener configurationChangeListener) {
         this.currentPortProvider = currentPortProvider;
         this.bootstrapFileProvider = bootstrapFileProvider;
         this.periodicStatusReporterManager = periodicStatusReporterManager;
@@ -89,7 +94,7 @@ public class StartRunner implements CommandRunner {
         this.bootstrapConfigFile = bootstrapConfigFile;
         this.runMiNiFi = runMiNiFi;
         this.miNiFiExecCommandProvider = miNiFiExecCommandProvider;
-        this.processUtils = processUtils;
+        this.configurationChangeListener = configurationChangeListener;
     }
 
     /**
@@ -129,9 +134,7 @@ public class StartRunner implements CommandRunner {
         String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY);
         initConfigFiles(bootstrapProperties, confDir);
 
-        Tuple<ProcessBuilder, Process> tuple = startMiNiFi();
-        ProcessBuilder builder = tuple.getKey();
-        Process process = tuple.getValue();
+        Process process = startMiNiFi();
 
         try {
             while (true) {
@@ -152,8 +155,7 @@ public class StartRunner implements CommandRunner {
                             Thread.sleep(5000L);
                             continue;
                         }
-
-                        process = restartNifi(bootstrapProperties, confDir, builder);
+                        process = restartMiNifi(bootstrapProperties, confDir);
                         // failed to start process
                         if (process == null) {
                             return;
@@ -170,44 +172,39 @@ public class StartRunner implements CommandRunner {
         }
     }
 
-    private Process restartNifi(Properties bootstrapProperties, String confDir, ProcessBuilder builder) throws IOException {
+    private Process restartMiNifi(Properties bootstrapProperties, String confDir) throws IOException {
         Process process;
         boolean previouslyStarted = runMiNiFi.isNiFiStarted();
+        boolean configChangeSuccessful = true;
         if (!previouslyStarted) {
-            File swapConfigFile = bootstrapFileProvider.getSwapFile();
+            File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
+            File bootstrapSwapConfigFile = bootstrapFileProvider.getBootstrapConfSwapFile();
             if (swapConfigFile.exists()) {
-                DEFAULT_LOGGER.info("Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
-
-                try {
-                    ByteBuffer tempConfigFile = generateConfigFiles(new FileInputStream(swapConfigFile), confDir, bootstrapProperties);
-                    runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
-                } catch (ConfigurationChangeException e) {
-                    DEFAULT_LOGGER.error("The swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+                if (!revertFlowConfig(bootstrapProperties, confDir, swapConfigFile)) {
                     return null;
                 }
-
-                Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
-
-                DEFAULT_LOGGER.info("Replacing config file with swap file and deleting swap file");
-                if (!swapConfigFile.delete()) {
-                    DEFAULT_LOGGER.warn("The swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+            } else if(bootstrapSwapConfigFile.exists()) {
+                if (!revertBootstrapConfig(confDir, bootstrapSwapConfigFile)) {
+                    return null;
                 }
-                runMiNiFi.setReloading(false);
             } else {
                 DEFAULT_LOGGER.info("MiNiFi either never started or failed to restart. Will not attempt to restart MiNiFi");
                 return null;
             }
+            configChangeSuccessful = false;
         } else {
             runMiNiFi.setNiFiStarted(false);
         }
 
         miNiFiParameters.setSecretKey(null);
 
-        process = startMiNiFiProcess(builder);
+        CMD_LOGGER.info("Restarting Apache MiNiFi...");
+        process = startMiNiFiProcess(getProcessBuilder());
 
         boolean started = waitForStart();
 
         if (started) {
+            runMiNiFi.sendAcknowledgeToMiNiFi(configChangeSuccessful ? FULLY_APPLIED : NOT_APPLIED_WITH_RESTART);
             Long pid = OSUtils.getProcessId(process, DEFAULT_LOGGER);
             DEFAULT_LOGGER.info("Successfully spawned the thread to start Apache MiNiFi{}", (pid == null ? "" : " with PID " + pid));
         } else {
@@ -216,6 +213,46 @@ public class StartRunner implements CommandRunner {
         return process;
     }
 
+    private boolean revertFlowConfig(Properties bootstrapProperties, String confDir, File swapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Flow Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(Files.newInputStream(swapConfigFile.toPath()), confDir, bootstrapProperties);
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The flow swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        Files.copy(swapConfigFile.toPath(), Paths.get(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)), REPLACE_EXISTING);
+
+        DEFAULT_LOGGER.info("Replacing flow config file with swap file and deleting swap file");
+        if (!swapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The flow swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
+    private boolean revertBootstrapConfig(String confDir, File bootstrapSwapConfigFile) throws IOException {
+        DEFAULT_LOGGER.info("Bootstrap Swap file exists, MiNiFi failed trying to change configuration. Reverting to old configuration.");
+
+        Files.copy(bootstrapSwapConfigFile.toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+        try {
+            ByteBuffer tempConfigFile = generateConfigFiles(asByteArrayInputStream(runMiNiFi.getConfigFileReference().get().duplicate()), confDir, bootstrapFileProvider.getBootstrapProperties());
+            runMiNiFi.getConfigFileReference().set(tempConfigFile.asReadOnlyBuffer());
+        } catch (ConfigurationChangeException e) {
+            DEFAULT_LOGGER.error("The bootstrap swap file is malformed, unable to restart from prior state. Will not attempt to restart MiNiFi. Swap File should be cleaned up manually.");
+            return false;
+        }
+
+        if (!bootstrapSwapConfigFile.delete()) {
+            DEFAULT_LOGGER.warn("The bootstrap swap file failed to delete after replacing using it to revert to the old configuration. It should be cleaned up manually.");
+        }
+        runMiNiFi.setReloading(false);
+        return true;
+    }
+
     private boolean needRestart() throws IOException {
         boolean needRestart = true;
         File statusFile = bootstrapFileProvider.getStatusFile();
@@ -236,16 +273,8 @@ public class StartRunner implements CommandRunner {
         try {
             Thread.sleep(1000L);
             if (runMiNiFi.getReloading() && runMiNiFi.isNiFiStarted()) {
-                File swapConfigFile = bootstrapFileProvider.getSwapFile();
-                if (swapConfigFile.exists()) {
-                    DEFAULT_LOGGER.info("MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration.");
-
-                    if (swapConfigFile.delete()) {
-                        DEFAULT_LOGGER.info("Swap file was successfully deleted.");
-                    } else {
-                        DEFAULT_LOGGER.error("Swap file was not deleted. It should be deleted manually.");
-                    }
-                }
+                deleteSwapFile(bootstrapFileProvider.getConfigYmlSwapFile());
+                deleteSwapFile(bootstrapFileProvider.getBootstrapConfSwapFile());
                 runMiNiFi.setReloading(false);
             }
         } catch (InterruptedException ie) {
@@ -253,6 +282,18 @@ public class StartRunner implements CommandRunner {
         }
     }
 
+    private void deleteSwapFile(File file) {
+        if (file.exists()) {
+            DEFAULT_LOGGER.info("MiNiFi has finished reloading successfully and {} file exists. Deleting old configuration.", file.getName());
+
+            if (file.delete()) {
+                DEFAULT_LOGGER.info("Swap file ({}) was successfully deleted.", file.getName());
+            } else {
+                DEFAULT_LOGGER.error("Swap file ({}) was not deleted. It should be deleted manually.", file.getAbsoluteFile());
+            }
+        }
+    }
+
     private void initConfigFiles(Properties bootstrapProperties, String confDir) throws IOException {
         File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
         try (InputStream inputStream = new FileInputStream(configFile)) {
@@ -269,27 +310,34 @@ public class StartRunner implements CommandRunner {
         }
     }
 
-    private Tuple<ProcessBuilder, Process> startMiNiFi() throws IOException {
-        ProcessBuilder builder = new ProcessBuilder();
+    private Process startMiNiFi() throws IOException {
 
-        File workingDir = getWorkingDir();
         MiNiFiListener listener = new MiNiFiListener();
-        int listenPort = listener.start(runMiNiFi);
+        listenPort = listener.start(runMiNiFi, bootstrapFileProvider, configurationChangeListener);
+
+        CMD_LOGGER.info("Starting Apache MiNiFi...");
+
+        return startMiNiFiProcess(getProcessBuilder());
+    }
+
+    private ProcessBuilder getProcessBuilder() throws IOException{
+        ProcessBuilder builder = new ProcessBuilder();
+        File workingDir = getWorkingDir();
+
         List<String> cmd = miNiFiExecCommandProvider.getMiNiFiExecCommand(listenPort, workingDir);
 
         builder.command(cmd);
         builder.directory(workingDir);
-
-        CMD_LOGGER.info("Starting Apache MiNiFi...");
-        CMD_LOGGER.info("Working Directory: {}", workingDir.getAbsolutePath());
+        CMD_LOGGER.debug("Working Directory: {}", workingDir.getAbsolutePath());
         CMD_LOGGER.info("Command: {}", String.join(" ", cmd));
-
-        return new Tuple<>(builder, startMiNiFiProcess(builder));
+        return builder;
     }
 
     private Process startMiNiFiProcess(ProcessBuilder builder) throws IOException {
         Process process = builder.start();
         miNiFiStdLogHandler.initLogging(process);
+        miNiFiParameters.setMiNiFiPort(UNINITIALIZED);
+        miNiFiParameters.setMinifiPid(UNINITIALIZED);
         Long pid = OSUtils.getProcessId(process, CMD_LOGGER);
         if (pid != null) {
             miNiFiParameters.setMinifiPid(pid);
@@ -317,8 +365,9 @@ public class StartRunner implements CommandRunner {
         lock.lock();
         try {
             long startTime = System.nanoTime();
-
-            while (miNiFiParameters.getMinifiPid() < 1 && miNiFiParameters.getMiNiFiPort() < 1) {
+            while (miNiFiParameters.getMinifiPid() < 1 && miNiFiParameters.getMiNiFiPort() < 1 || !runMiNiFi.isNiFiStarted()) {
+                DEFAULT_LOGGER.debug("Waiting MiNiFi to start Pid={}, port={}, isNifiStarted={}",
+                    miNiFiParameters.getMinifiPid(), miNiFiParameters.getMiNiFiPort(), runMiNiFi.isNiFiStarted());
                 try {
                     startupCondition.await(1, TimeUnit.SECONDS);
                 } catch (InterruptedException ie) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
index 8bf255f07c..7f3a0a8dce 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java
@@ -28,6 +28,7 @@ import java.util.Properties;
 import java.util.Set;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor;
+import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
 import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +42,12 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
     private final Set<ConfigurationChangeListener> configurationChangeListeners;
     private final Set<ChangeIngestor> changeIngestors = new HashSet<>();
 
-    private final Properties bootstrapProperties;
+    private final BootstrapFileProvider bootstrapFileProvider;
     private final RunMiNiFi runMiNiFi;
 
-    public ConfigurationChangeCoordinator(Properties bootstrapProperties, RunMiNiFi runMiNiFi,
+    public ConfigurationChangeCoordinator(BootstrapFileProvider bootstrapFileProvider, RunMiNiFi runMiNiFi,
         Set<ConfigurationChangeListener> miNiFiConfigurationChangeListeners) {
-        this.bootstrapProperties = bootstrapProperties;
+        this.bootstrapFileProvider = bootstrapFileProvider;
         this.runMiNiFi = runMiNiFi;
         this.configurationChangeListeners = Optional.ofNullable(miNiFiConfigurationChangeListeners).map(Collections::unmodifiableSet).orElse(Collections.emptySet());
     }
@@ -54,7 +55,7 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
     /**
      * Begins the associated notification service provided by the given implementation.  In most implementations, no action will occur until this method is invoked.
      */
-    public void start() {
+    public void start() throws IOException{
         initialize();
         changeIngestors.forEach(ChangeIngestor::start);
     }
@@ -102,8 +103,9 @@ public class ConfigurationChangeCoordinator implements Closeable, ConfigurationC
         }
     }
 
-    private void initialize() {
+    private void initialize() throws IOException {
         close();
+        Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
         // cleanup previously initialized ingestors
         String ingestorsCsv = bootstrapProperties.getProperty(NOTIFIER_INGESTORS_KEY);
 
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
index fa65aa0378..b2cd1bd77a 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/Differentiator.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
 
 import java.io.IOException;
-import java.util.Properties;
 import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
 
 /**
@@ -31,10 +30,9 @@ public interface Differentiator <T> {
     /**
      * Initialise the differentiator with the initial configuration
      *
-     * @param properties the properties to be used
      * @param configurationFileHolder holder for the config file
      */
-    void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder);
+    void initialize(ConfigurationFileHolder configurationFileHolder);
 
     /**
      * Determine whether the config file changed
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
index e86fe632ef..5a081d33b6 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiator.java
@@ -17,17 +17,15 @@
 
 package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
 
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public abstract class WholeConfigDifferentiator {
 
@@ -59,7 +57,7 @@ public abstract class WholeConfigDifferentiator {
         }
     }
 
-    public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) {
+    public void initialize(ConfigurationFileHolder configurationFileHolder) {
         this.configurationFileHolder = configurationFileHolder;
     }
 
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
index d7ae75a877..97260aa026 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestor.java
@@ -173,7 +173,7 @@ public class FileChangeIngestor implements Runnable, ChangeIngestor {
         } else {
             differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
         }
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
     }
 
     protected void setConfigFilePath(Path configFilePath) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
index 14000d9e6b..f321f19b1f 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestor.java
@@ -196,7 +196,7 @@ public class PullHttpChangeIngestor extends AbstractPullChangeIngestor {
         } else {
             differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
         }
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
     }
 
 
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
index cb9ded9762..fe0ccbf548 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestor.java
@@ -111,7 +111,7 @@ public class RestChangeIngestor implements ChangeIngestor {
         } else {
             differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
         }
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
 
         // create the secure connector if keystore location is specified
         if (properties.getProperty(KEYSTORE_LOCATION_KEY) != null) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
index c64bc547cf..9d95f941f0 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodec.java
@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
 import java.util.Arrays;
 import java.util.Optional;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.exception.InvalidCommandException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,24 +38,27 @@ public class BootstrapCodec {
     private static final String FALSE = Boolean.FALSE.toString();
 
     private final RunMiNiFi runner;
-    private final BufferedReader reader;
-    private final BufferedWriter writer;
     private final Logger logger = LoggerFactory.getLogger(BootstrapCodec.class);
+    private final UpdatePropertiesService updatePropertiesService;
+    private final UpdateConfigurationService updateConfigurationService;
 
-    public BootstrapCodec(RunMiNiFi runner, InputStream in, OutputStream out) {
+    public BootstrapCodec(RunMiNiFi runner, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) {
         this.runner = runner;
-        this.reader = new BufferedReader(new InputStreamReader(in));
-        this.writer = new BufferedWriter(new OutputStreamWriter(out));
+        this.updatePropertiesService = new UpdatePropertiesService(runner, logger, bootstrapFileProvider);
+        this.updateConfigurationService = new UpdateConfigurationService(runner, configurationChangeListener, bootstrapFileProvider);
     }
 
-    public void communicate() throws IOException {
+    public void communicate(InputStream in, OutputStream out) throws IOException {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+        BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+
         String line = reader.readLine();
         String[] splits = Optional.ofNullable(line).map(l -> l.split(" ")).orElse(new String[0]);
         if (splits.length == 0) {
             throw new IOException("Received invalid command from MiNiFi: " + line);
         }
 
-        String cmd = splits[0];
+        BootstrapCommand cmd = BootstrapCommand.safeValueOf(splits[0]);
         String[] args;
         if (splits.length == 1) {
             args = new String[0];
@@ -63,45 +67,65 @@ public class BootstrapCodec {
         }
 
         try {
-            processRequest(cmd, args);
+            processRequest(cmd, args, writer);
         } catch (InvalidCommandException exception) {
             throw new IOException("Received invalid command from MiNiFi: " + line, exception);
         }
     }
 
-    private void processRequest(String cmd, String[] args) throws InvalidCommandException, IOException {
+    private void processRequest(BootstrapCommand cmd, String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
         switch (cmd) {
-            case "PORT":
-                handlePortCommand(args);
+            case PORT:
+                handlePortCommand(args, writer);
+                break;
+            case STARTED:
+                handleStartedCommand(args, writer);
+                break;
+            case SHUTDOWN:
+                handleShutDownCommand(writer);
                 break;
-            case "STARTED":
-                handleStartedCommand(args);
+            case RELOAD:
+                handleReloadCommand(writer);
                 break;
-            case "SHUTDOWN":
-                handleShutDownCommand();
+            case UPDATE_PROPERTIES:
+                handlePropertiesUpdateCommand(writer);
                 break;
-            case "RELOAD":
-                handleReloadCommand();
+            case UPDATE_CONFIGURATION:
+                handleUpdateConfigurationCommand(writer);
                 break;
             default:
                 throw new InvalidCommandException("Unknown command: " + cmd);
         }
     }
 
-    private void handleReloadCommand() throws IOException {
+    private void handleUpdateConfigurationCommand(BufferedWriter writer) throws IOException {
+        logger.debug("Received 'UPDATE_CONFIGURATION' command from MINIFI");
+        writeOk(writer);
+        runner.setCommandInProgress(true);
+        updateConfigurationService.handleUpdate().ifPresent(runner::sendAcknowledgeToMiNiFi);
+    }
+
+    private void handlePropertiesUpdateCommand(BufferedWriter writer) throws IOException {
+        logger.debug("Received 'UPDATE_PROPERTIES' command from MINIFI");
+        writeOk(writer);
+        runner.setCommandInProgress(true);
+        updatePropertiesService.handleUpdate().ifPresent(runner::sendAcknowledgeToMiNiFi);
+    }
+
+    private void handleReloadCommand(BufferedWriter writer) throws IOException {
         logger.debug("Received 'RELOAD' command from MINIFI");
-        writeOk();
+        writeOk(writer);
     }
 
-    private void handleShutDownCommand() throws IOException {
+    private void handleShutDownCommand(BufferedWriter writer) throws IOException {
         logger.debug("Received 'SHUTDOWN' command from MINIFI");
+        writeOk(writer);
         runner.shutdownChangeNotifier();
         runner.getPeriodicStatusReporterManager().shutdownPeriodicStatusReporters();
-        writeOk();
     }
 
-    private void handleStartedCommand(String[] args) throws InvalidCommandException, IOException {
-        logger.debug("Received 'STARTED' command from MINIFI");
+    private void handleStartedCommand(String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
+        logger.info("Received 'STARTED' command from MINIFI");
         if (args.length != 1) {
             throw new InvalidCommandException("STARTED command must contain a status argument");
         }
@@ -110,15 +134,15 @@ public class BootstrapCodec {
             throw new InvalidCommandException("Invalid status for STARTED command; should be true or false, but was '" + args[0] + "'");
         }
 
+        writeOk(writer);
         runner.getPeriodicStatusReporterManager().shutdownPeriodicStatusReporters();
         runner.getPeriodicStatusReporterManager().startPeriodicNotifiers();
         runner.getConfigurationChangeCoordinator().start();
 
         runner.setNiFiStarted(Boolean.parseBoolean(args[0]));
-        writeOk();
     }
 
-    private void handlePortCommand(String[] args) throws InvalidCommandException, IOException {
+    private void handlePortCommand(String[] args, BufferedWriter writer) throws InvalidCommandException, IOException {
         logger.debug("Received 'PORT' command from MINIFI");
         if (args.length != 2) {
             throw new InvalidCommandException("PORT command must contain the port and secretKey arguments");
@@ -135,13 +159,25 @@ public class BootstrapCodec {
             throw new InvalidCommandException("Invalid Port number; should be integer between 1 and 65535");
         }
 
+        writeOk(writer);
         runner.setMiNiFiParameters(port, args[1]);
-        writeOk();
     }
 
-    private void writeOk() throws IOException {
+    private void writeOk(BufferedWriter writer) throws IOException {
         writer.write("OK");
         writer.newLine();
         writer.flush();
     }
+
+    private enum BootstrapCommand {
+        PORT, STARTED, SHUTDOWN, RELOAD, UPDATE_PROPERTIES, UPDATE_CONFIGURATION, UNKNOWN;
+
+        public static BootstrapCommand safeValueOf(String value) {
+            try {
+                return BootstrapCommand.valueOf(value);
+            } catch (IllegalArgumentException e) {
+                return UNKNOWN;
+            }
+        }
+    }
 }
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
index 67f57b4948..ee94602aea 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/BootstrapFileProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.minifi.bootstrap.service;
 
 import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.STATUS_FILE_PID_KEY;
 import static org.apache.nifi.minifi.bootstrap.SensitiveProperty.SENSITIVE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -93,7 +94,7 @@ public class BootstrapFileProvider {
         return reloadFile;
     }
 
-    public File getSwapFile() {
+    public File getConfigYmlSwapFile() {
         File confDir = bootstrapConfigFile.getParentFile();
         File swapFile = new File(confDir, "swap.yml");
 
@@ -101,6 +102,22 @@ public class BootstrapFileProvider {
         return swapFile;
     }
 
+    public File getBootstrapConfSwapFile() {
+        File confDir = bootstrapConfigFile.getParentFile();
+        File swapFile = new File(confDir, "bootstrap-swap.conf");
+
+        LOGGER.debug("Bootstrap Swap File: {}", swapFile);
+        return swapFile;
+    }
+
+    public File getBootstrapConfNewFile() {
+        File confDir = bootstrapConfigFile.getParentFile();
+        File newFile = new File(confDir, BOOTSTRAP_UPDATED_FILE_NAME);
+
+        LOGGER.debug("Bootstrap new File: {}", newFile);
+        return newFile;
+    }
+
     public Properties getBootstrapProperties() throws IOException {
         if (!bootstrapConfigFile.exists()) {
             throw new FileNotFoundException(bootstrapConfigFile.getAbsolutePath());
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
index c0343c9f5e..7800d4b28a 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiConfigurationChangeListener.java
@@ -61,7 +61,7 @@ public class MiNiFiConfigurationChangeListener implements ConfigurationChangeLis
             Properties bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
             File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
 
-            File swapConfigFile = bootstrapFileProvider.getSwapFile();
+            File swapConfigFile = bootstrapFileProvider.getConfigYmlSwapFile();
             logger.info("Persisting old configuration to {}", swapConfigFile.getAbsolutePath());
 
             try (FileInputStream configFileInputStream = new FileInputStream(configFile)) {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
index e9a8eaff64..90f90201cd 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/MiNiFiListener.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.util.LimitingInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +38,11 @@ public class MiNiFiListener {
     private Listener listener;
     private ServerSocket serverSocket;
 
-    public int start(RunMiNiFi runner) throws IOException {
+    public int start(RunMiNiFi runner, BootstrapFileProvider bootstrapFileProvider, ConfigurationChangeListener configurationChangeListener) throws IOException {
         serverSocket = new ServerSocket();
         serverSocket.bind(new InetSocketAddress("localhost", 0));
 
-        listener = new Listener(serverSocket, runner);
+        listener = new Listener(serverSocket, new BootstrapCodec(runner, bootstrapFileProvider, configurationChangeListener));
         Thread listenThread = new Thread(listener);
         listenThread.setName("MiNiFi listener");
         listenThread.setDaemon(true);
@@ -64,19 +65,18 @@ public class MiNiFiListener {
 
         private final ServerSocket serverSocket;
         private final ExecutorService executor;
-        private final RunMiNiFi runner;
         private volatile boolean stopped = false;
+        private final BootstrapCodec codec;
 
-        public Listener(ServerSocket serverSocket, RunMiNiFi runner) {
+        public Listener(ServerSocket serverSocket, BootstrapCodec bootstrapCodec) {
             this.serverSocket = serverSocket;
+            this.codec = bootstrapCodec;
             this.executor = Executors.newFixedThreadPool(2, runnable -> {
                 Thread t = Executors.defaultThreadFactory().newThread(runnable);
                 t.setDaemon(true);
                 t.setName("MiNiFi Bootstrap Command Listener");
                 return t;
             });
-
-            this.runner = runner;
         }
 
         public void stop() {
@@ -126,8 +126,7 @@ public class MiNiFiListener {
                         // which in turn may cause the Shutdown Hook to shutdown MiNiFi.
                         // So we will limit the amount of data to read to 4 KB
                         try (InputStream limitingIn = new LimitingInputStream(socket.getInputStream(), 4096)) {
-                            BootstrapCodec codec = new BootstrapCodec(runner, limitingIn, socket.getOutputStream());
-                            codec.communicate();
+                            codec.communicate(limitingIn, socket.getOutputStream());
                         } catch (Exception t) {
                             LOGGER.error("Failed to communicate with MiNiFi due to exception: ", t);
                         } finally {
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java
new file mode 100644
index 0000000000..a2fc043a8f
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdateConfigurationService.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.util.Optional.ofNullable;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.CONFIG_UPDATED_FILE_NAME;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.WholeConfigDifferentiator;
+import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream;
+import org.apache.nifi.minifi.bootstrap.util.ConfigTransformer;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateConfigurationService {
+
+    private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationService.class);
+    private static final String FALLBACK_CONFIG_FILE_DIR = "./conf/";
+
+    private final Differentiator<ByteBuffer> differentiator;
+    private final RunMiNiFi runMiNiFi;
+    private final ConfigurationChangeListener miNiFiConfigurationChangeListener;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdateConfigurationService(RunMiNiFi runMiNiFi, ConfigurationChangeListener miNiFiConfigurationChangeListener, BootstrapFileProvider bootstrapFileProvider) {
+        this.differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+        this.differentiator.initialize(runMiNiFi);
+        this.runMiNiFi = runMiNiFi;
+        this.miNiFiConfigurationChangeListener = miNiFiConfigurationChangeListener;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        logger.info("Handling configuration update");
+        MiNiFiCommandState commandState = null;
+        try (FileInputStream configFile = new FileInputStream(getConfigFilePath().toFile())) {
+            ByteBuffer readOnlyNewConfig = ConfigTransformer.overrideNonFlowSectionsFromOriginalSchema(
+                    IOUtils.toByteArray(configFile), runMiNiFi.getConfigFileReference().get().duplicate(), bootstrapFileProvider.getBootstrapProperties());
+            if (differentiator.isNew(readOnlyNewConfig)) {
+                miNiFiConfigurationChangeListener.handleChange(new ByteBufferInputStream(readOnlyNewConfig.duplicate()));
+            } else {
+                logger.info("The given configuration does not contain any update. No operation required");
+                commandState = MiNiFiCommandState.NO_OPERATION;
+            }
+        } catch (Exception e) {
+            commandState = MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART;
+            logger.error("Could not handle configuration update", e);
+        }
+        return Optional.ofNullable(commandState);
+    }
+
+    private Path getConfigFilePath() {
+        return ofNullable(safeGetPropertiesFilePath())
+            .map(File::new)
+            .map(File::getParent)
+            .map(parentDir -> new File(parentDir + CONFIG_UPDATED_FILE_NAME))
+            .orElse(new File(FALLBACK_CONFIG_FILE_DIR + CONFIG_UPDATED_FILE_NAME)).toPath();
+    }
+
+    private String safeGetPropertiesFilePath() {
+        String propertyFile = null;
+        try {
+            propertyFile = bootstrapFileProvider.getBootstrapProperties().getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
+        } catch (IOException e) {
+            logger.error("Failed to get properties file path");
+        }
+        return propertyFile;
+    }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java
new file mode 100644
index 0000000000..4b3de7e89a
--- /dev/null
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/service/UpdatePropertiesService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.service;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.apache.nifi.minifi.bootstrap.RunMiNiFi.CONF_DIR_KEY;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.asByteArrayInputStream;
+import static org.apache.nifi.minifi.bootstrap.util.ConfigTransformer.generateConfigFiles;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.Properties;
+import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
+import org.slf4j.Logger;
+
+public class UpdatePropertiesService {
+    private final RunMiNiFi runner;
+    private final Logger logger;
+    private final BootstrapFileProvider bootstrapFileProvider;
+
+    public UpdatePropertiesService(RunMiNiFi runner, Logger logger, BootstrapFileProvider bootstrapFileProvider) {
+        this.runner = runner;
+        this.logger = logger;
+        this.bootstrapFileProvider = bootstrapFileProvider;
+    }
+
+    public Optional<MiNiFiCommandState> handleUpdate() {
+        Optional<MiNiFiCommandState> commandState;
+        try {
+            File bootstrapConfigFile = BootstrapFileProvider.getBootstrapConfFile();
+
+            File bootstrapSwapConfigFile = bootstrapFileProvider.getBootstrapConfSwapFile();
+            logger.info("Persisting old bootstrap configuration to {}", bootstrapSwapConfigFile.getAbsolutePath());
+
+            try (FileInputStream configFileInputStream = new FileInputStream(bootstrapConfigFile)) {
+                Files.copy(configFileInputStream, bootstrapSwapConfigFile.toPath(), REPLACE_EXISTING);
+            }
+
+            Files.copy(bootstrapFileProvider.getBootstrapConfNewFile().toPath(), bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+
+            // already from new
+            commandState = generateConfigfilesBasedOnNewProperties(bootstrapConfigFile, bootstrapSwapConfigFile, bootstrapFileProvider.getBootstrapProperties());
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART);
+            logger.error("Failed to load new bootstrap properties", e);
+        }
+        return commandState;
+    }
+
+    private Optional<MiNiFiCommandState> generateConfigfilesBasedOnNewProperties(File bootstrapConfigFile, File bootstrapSwapConfigFile, Properties bootstrapProperties)
+        throws IOException, ConfigurationChangeException {
+        Optional<MiNiFiCommandState> commandState = Optional.empty();
+        try {
+            ByteBuffer byteBuffer = generateConfigFiles(asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()),
+                bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+            runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+            restartInstance();
+        } catch (Exception e) {
+            commandState = Optional.of(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART);
+            // reverting config file
+            try (FileInputStream swapConfigFileStream = new FileInputStream(bootstrapSwapConfigFile)) {
+                Files.copy(swapConfigFileStream, bootstrapConfigFile.toPath(), REPLACE_EXISTING);
+            }
+            // read reverted properties
+            bootstrapProperties = bootstrapFileProvider.getBootstrapProperties();
+
+            ByteBuffer byteBuffer = generateConfigFiles(
+                asByteArrayInputStream(runner.getConfigFileReference().get().duplicate()), bootstrapProperties.getProperty(CONF_DIR_KEY), bootstrapProperties);
+            runner.getConfigFileReference().set(byteBuffer.asReadOnlyBuffer());
+
+            logger.debug("Transformation of new config file failed after swap file was created, deleting it.");
+            if (!bootstrapSwapConfigFile.delete()) {
+                logger.warn("The swap file ({}) failed to delete after a failed handling of a change. It should be cleaned up manually.", bootstrapSwapConfigFile);
+            }
+        }
+        return commandState;
+    }
+
+    private void restartInstance() throws IOException {
+        try {
+            runner.reload();
+        } catch (IOException e) {
+            throw new IOException("Unable to successfully restart MiNiFi instance after configuration change.", e);
+        }
+    }
+}
diff --git a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
index 2251e6a84a..5b9d0f2a88 100644
--- a/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
+++ b/minifi/minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformer.java
@@ -194,6 +194,12 @@ public final class ConfigTransformer {
         return schema;
     }
 
+    public static ByteArrayInputStream asByteArrayInputStream(ByteBuffer byteBuffer) {
+        byte[] config = new byte[byteBuffer.remaining()];
+        byteBuffer.get(config);
+        return new ByteArrayInputStream(config);
+    }
+
     protected static void writeNiFiPropertiesFile(ByteArrayOutputStream nifiPropertiesOutputStream, String destPath) throws IOException {
         final Path nifiPropertiesPath = Paths.get(destPath, "nifi.properties");
         try (FileOutputStream nifiProperties = new FileOutputStream(nifiPropertiesPath.toString())) {
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
index 366d70b9b7..81315879b8 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/command/CommandRunnerFactoryTest.java
@@ -30,9 +30,11 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.verifyNoInteractions;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import org.apache.nifi.minifi.bootstrap.MiNiFiParameters;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.apache.nifi.minifi.bootstrap.service.BootstrapFileProvider;
 import org.apache.nifi.minifi.bootstrap.service.CurrentPortProvider;
 import org.apache.nifi.minifi.bootstrap.service.GracefulShutdownParameterProvider;
@@ -72,6 +74,10 @@ class CommandRunnerFactoryTest {
     private GracefulShutdownParameterProvider gracefulShutdownParameterProvider;
     @Mock
     private MiNiFiExecCommandProvider miNiFiExecCommandProvider;
+    @Mock
+    private ObjectMapper objectMapper;
+    @Mock
+    private ConfigurationChangeListener configurationChangeListener;
 
     @InjectMocks
     private CommandRunnerFactory commandRunnerFactory;
@@ -81,8 +87,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(START);
 
         assertInstanceOf(StartRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -90,8 +95,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(RUN);
 
         assertInstanceOf(StartRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -99,8 +103,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(STOP);
 
         assertInstanceOf(StopRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -108,8 +111,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(ENV);
 
         assertInstanceOf(EnvRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -117,8 +119,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(DUMP);
 
         assertInstanceOf(DumpRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -126,8 +127,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(FLOWSTATUS);
 
         assertInstanceOf(FlowStatusRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -135,8 +135,7 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(STATUS);
 
         assertInstanceOf(StatusRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
@@ -144,12 +143,16 @@ class CommandRunnerFactoryTest {
         CommandRunner runner = commandRunnerFactory.getRunner(RESTART);
 
         assertInstanceOf(CompositeCommandRunner.class, runner);
-        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
-            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider);
+        verify();
     }
 
     @Test
     void testRunCommandShouldThrowIllegalArgumentExceptionInCaseOfUnknownCommand() {
         assertThrows(IllegalArgumentException.class, () -> commandRunnerFactory.getRunner(UNKNOWN));
     }
+
+    private void verify() {
+        verifyNoInteractions(miNiFiCommandSender, currentPortProvider, miNiFiParameters, miNiFiStatusProvider, periodicStatusReporterManager, bootstrapFileProvider,
+            miNiFiStdLogHandler, bootstrapConfigFile, runMiNiFi, gracefulShutdownParameterProvider, miNiFiExecCommandProvider, objectMapper, configurationChangeListener);
+    }
 }
\ No newline at end of file
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
index a1f9401c09..c12869d7e9 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
@@ -27,7 +27,6 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import okhttp3.Request;
 import org.apache.commons.io.FileUtils;
@@ -43,7 +42,6 @@ public class WholeConfigDifferentiatorTest {
 
     public static ByteBuffer defaultConfigBuffer;
     public static ByteBuffer newConfigBuffer;
-    public static Properties properties = new Properties();
     public static ConfigurationFileHolder configurationFileHolder;
 
     public static Request dummyRequest;
@@ -68,7 +66,7 @@ public class WholeConfigDifferentiatorTest {
     @Test
     public void TestSameInputStream() throws IOException {
         Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
 
         FileInputStream fileInputStream = new FileInputStream(defaultConfigPath.toFile());
         assertFalse(differentiator.isNew(fileInputStream));
@@ -77,7 +75,7 @@ public class WholeConfigDifferentiatorTest {
     @Test
     public void TestNewInputStream() throws IOException {
         Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
 
         FileInputStream fileInputStream = new FileInputStream(newConfigPath.toFile());
         assertTrue(differentiator.isNew(fileInputStream));
@@ -88,7 +86,7 @@ public class WholeConfigDifferentiatorTest {
     @Test
     public void TestSameByteBuffer() throws IOException {
         Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
 
         assertFalse(differentiator.isNew(defaultConfigBuffer));
     }
@@ -96,7 +94,7 @@ public class WholeConfigDifferentiatorTest {
     @Test
     public void TestNewByteBuffer() throws IOException {
         Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
-        differentiator.initialize(properties, configurationFileHolder);
+        differentiator.initialize(configurationFileHolder);
 
         assertTrue(differentiator.isNew(newConfigBuffer));
     }
diff --git a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
index ccbe6b40a7..0ebb6c68f9 100644
--- a/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
+++ b/minifi/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/service/BootstrapCodecTest.java
@@ -29,36 +29,57 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
 import java.util.stream.Stream;
+import org.apache.nifi.c2.protocol.api.C2Operation;
 import org.apache.nifi.minifi.bootstrap.RunMiNiFi;
 import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeCoordinator;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
 
+@ExtendWith(MockitoExtension.class)
 class BootstrapCodecTest {
 
     private static final int VALID_PORT = 1;
     private static final String SECRET = "secret";
     private static final String OK = "OK";
     private static final String EMPTY_STRING = "";
+
+    @Mock
     private RunMiNiFi runner;
+    @Mock
+    private BootstrapFileProvider bootstrapFileProvider;
+    @Mock
+    private ConfigurationChangeListener configurationChangeListener;
+    @Mock
+    private UpdateConfigurationService updateConfigurationService;
+    @Mock
+    private UpdatePropertiesService updatePropertiesService;
+
+    @InjectMocks
+    private BootstrapCodec bootstrapCodec;
 
     @BeforeEach
-    void setup() {
-        runner = mock(RunMiNiFi.class);
+    void setup() throws IllegalAccessException, NoSuchFieldException {
+        mockFinal("updateConfigurationService", updateConfigurationService);
+        mockFinal("updatePropertiesService", updatePropertiesService);
     }
 
     @Test
     void testCommunicateShouldThrowIOExceptionIfThereIsNoCommand() {
         InputStream inputStream = new ByteArrayInputStream(new byte[0]);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        assertThrows(IOException.class, bootstrapCodec::communicate);
+        assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
         assertEquals(EMPTY_STRING, outputStream.toString().trim());
         verifyNoInteractions(runner);
     }
@@ -68,9 +89,8 @@ class BootstrapCodecTest {
         String unknown = "unknown";
         InputStream inputStream = new ByteArrayInputStream(unknown.getBytes(StandardCharsets.UTF_8));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        assertThrows(IOException.class, bootstrapCodec::communicate);
+        assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
         assertEquals(EMPTY_STRING, outputStream.toString().trim());
         verifyNoInteractions(runner);
     }
@@ -80,9 +100,8 @@ class BootstrapCodecTest {
         String command = "PORT " + VALID_PORT + " " + SECRET;
         InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        bootstrapCodec.communicate();
+        bootstrapCodec.communicate(inputStream, outputStream);
 
         verify(runner).setMiNiFiParameters(VALID_PORT, SECRET);
         assertEquals(OK, outputStream.toString().trim());
@@ -93,9 +112,8 @@ class BootstrapCodecTest {
     void testCommunicateShouldFailWhenReceivesPortCommand(String command) {
         InputStream inputStream = new ByteArrayInputStream(command.getBytes(StandardCharsets.UTF_8));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        assertThrows(IOException.class, bootstrapCodec::communicate);
+        assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
         assertEquals(EMPTY_STRING, outputStream.toString().trim());
         verifyNoInteractions(runner);
     }
@@ -112,9 +130,8 @@ class BootstrapCodecTest {
     void testCommunicateShouldFailIfStartedCommandHasOtherThanOneArg() {
         InputStream inputStream = new ByteArrayInputStream("STARTED".getBytes(StandardCharsets.UTF_8));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        assertThrows(IOException.class, bootstrapCodec::communicate);
+        assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
         assertEquals(EMPTY_STRING, outputStream.toString().trim());
         verifyNoInteractions(runner);
     }
@@ -123,9 +140,8 @@ class BootstrapCodecTest {
     void testCommunicateShouldFailIfStartedCommandFirstArgIsNotBoolean() {
         InputStream inputStream = new ByteArrayInputStream("STARTED yes".getBytes(StandardCharsets.UTF_8));
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
 
-        assertThrows(IOException.class, bootstrapCodec::communicate);
+        assertThrows(IOException.class, () -> bootstrapCodec.communicate(inputStream, outputStream));
         assertEquals(EMPTY_STRING, outputStream.toString().trim());
         verifyNoInteractions(runner);
     }
@@ -136,11 +152,10 @@ class BootstrapCodecTest {
         PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
         ConfigurationChangeCoordinator configurationChangeCoordinator = mock(ConfigurationChangeCoordinator.class);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
         when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
         when(runner.getConfigurationChangeCoordinator()).thenReturn(configurationChangeCoordinator);
 
-        bootstrapCodec.communicate();
+        bootstrapCodec.communicate(inputStream, outputStream);
 
         assertEquals(OK, outputStream.toString().trim());
         verify(runner, times(2)).getPeriodicStatusReporterManager();
@@ -157,10 +172,9 @@ class BootstrapCodecTest {
 
         PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
         when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
 
-        bootstrapCodec.communicate();
+        bootstrapCodec.communicate(inputStream, outputStream);
 
         assertEquals(OK, outputStream.toString().trim());
         verify(runner).getPeriodicStatusReporterManager();
@@ -172,13 +186,42 @@ class BootstrapCodecTest {
     void testCommunicateShouldHandleReloadCommand() throws IOException {
         InputStream inputStream = new ByteArrayInputStream("RELOAD".getBytes(StandardCharsets.UTF_8));
 
-        PeriodicStatusReporterManager periodicStatusReporterManager = mock(PeriodicStatusReporterManager.class);
         ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-        BootstrapCodec bootstrapCodec = new BootstrapCodec(runner, inputStream, outputStream);
-        when(runner.getPeriodicStatusReporterManager()).thenReturn(periodicStatusReporterManager);
 
-        bootstrapCodec.communicate();
+        bootstrapCodec.communicate(inputStream, outputStream);
+
+        assertEquals(OK, outputStream.toString().trim());
+    }
+
+    @Test
+    void testUpdateConfigurationCommandShouldHandleUpdateConfiguration() throws IOException {
+        InputStream inputStream = new ByteArrayInputStream("UPDATE_CONFIGURATION".getBytes(StandardCharsets.UTF_8));
+        C2Operation c2Operation = new C2Operation();
+        c2Operation.setIdentifier("id");
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        bootstrapCodec.communicate(inputStream, outputStream);
 
         assertEquals(OK, outputStream.toString().trim());
+        verify(updateConfigurationService).handleUpdate();
+    }
+
+    @Test
+    void testUpdatePropertiesCommandShouldHandleUpdateProperties() throws IOException {
+        InputStream inputStream = new ByteArrayInputStream("UPDATE_PROPERTIES".getBytes(StandardCharsets.UTF_8));
+        C2Operation c2Operation = new C2Operation();
+        c2Operation.setIdentifier("id");
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        bootstrapCodec.communicate(inputStream, outputStream);
+
+        assertEquals(OK, outputStream.toString().trim());
+        verify(updatePropertiesService).handleUpdate();
+    }
+
+    private void mockFinal(String fieldName, Object value) throws NoSuchFieldException, IllegalAccessException {
+        Field updateConfigurationServiceField = BootstrapCodec.class.getDeclaredField(fieldName);
+        updateConfigurationServiceField.setAccessible(true);
+        updateConfigurationServiceField.set(bootstrapCodec, value);
     }
 }
\ No newline at end of file
diff --git a/minifi/minifi-commons/minifi-commons-api/pom.xml b/minifi/minifi-commons/minifi-commons-api/pom.xml
new file mode 100644
index 0000000000..aadd4dc251
--- /dev/null
+++ b/minifi/minifi-commons/minifi-commons-api/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <artifactId>minifi-commons</artifactId>
+        <version>1.20.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>minifi-commons-api</artifactId>
+    <packaging>jar</packaging>
+
+</project>
\ No newline at end of file
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
similarity index 60%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
index 1f1cb19035..451060bb82 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiCommandState.java
@@ -15,28 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.minifi.commons.api;
 
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    ASSET;
-
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
+/**
+ * Response State for Commands coming from MiNiFi.
+ */
+public enum MiNiFiCommandState {
+    FULLY_APPLIED, NO_OPERATION, NOT_APPLIED_WITHOUT_RESTART, NOT_APPLIED_WITH_RESTART;
 }
diff --git a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
similarity index 60%
copy from c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
copy to minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
index 1f1cb19035..1bbac63869 100644
--- a/c2/c2-protocol/c2-protocol-api/src/main/java/org/apache/nifi/c2/protocol/api/OperandType.java
+++ b/minifi/minifi-commons/minifi-commons-api/src/main/java/org/apache/nifi/minifi/commons/api/MiNiFiConstants.java
@@ -15,28 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.nifi.c2.protocol.api;
+package org.apache.nifi.minifi.commons.api;
 
-import java.util.Arrays;
-import java.util.Optional;
-
-public enum OperandType {
-
-    CONFIGURATION,
-    CONNECTION,
-    DEBUG,
-    MANIFEST,
-    REPOSITORY,
-    ASSET;
-
-    public static Optional<OperandType> fromString(String value) {
-        return Arrays.stream(values())
-            .filter(operandType -> operandType.name().equalsIgnoreCase(value))
-            .findAny();
-    }
-
-    @Override
-    public String toString() {
-        return super.toString().toLowerCase();
-    }
+public interface MiNiFiConstants {
+    String BOOTSTRAP_UPDATED_FILE_NAME = "bootstrap-updated.conf";
+    String CONFIG_UPDATED_FILE_NAME = "config-updated.yml";
 }
diff --git a/minifi/minifi-commons/pom.xml b/minifi/minifi-commons/pom.xml
index be41c74424..b6ef4b6c27 100644
--- a/minifi/minifi-commons/pom.xml
+++ b/minifi/minifi-commons/pom.xml
@@ -28,5 +28,6 @@ limitations under the License.
     <modules>
         <module>minifi-commons-schema</module>
         <module>minifi-utils</module>
+        <module>minifi-commons-api</module>
     </modules>
 </project>
diff --git a/minifi/minifi-docker/pom.xml b/minifi/minifi-docker/pom.xml
index e8a8412302..10344dcbcb 100644
--- a/minifi/minifi-docker/pom.xml
+++ b/minifi/minifi-docker/pom.xml
@@ -97,6 +97,7 @@ limitations under the License.
                                     </buildArgs>
                                     <repository>apacheminifi</repository>
                                     <tag>${minifi.version}</tag>
+                                    <tag>latest</tag>
                                 </configuration>
                             </execution>
                         </executions>
diff --git a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
index 126a391634..2f43cfa1b7 100644
--- a/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
+++ b/minifi/minifi-docs/src/main/markdown/minifi-java-agent-quick-start.md
@@ -126,15 +126,8 @@ c2.agent.heartbeat.period=5000
 #(Optional) c2.rest.callTimeout=10 sec
 #(Optional) c2.agent.identifier=123-456-789
 c2.agent.class=agentClassName
-```
-3. Configure MiNiFi to recognize _config.yml_ changes
-```
-nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
-nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
-nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
-```
-4. Start MiNiFi
-5. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
+3. Start MiNiFi
+4. When a new flow is available on the C2 server, MiNiFi will download it via C2 and restart itself to pick up the changes
 
 **Note:** Flow definitions are class based. Each class has one flow defined for it. As a result, all the agents belonging to the same class will get the flow at update.<br>
 **Note:** Compression can be turned on for C2 requests by setting `c2.request.compression=gzip`. Compression is turned off by default when the parameter is omitted, or when `c2.request.compression=none` is given. It can be beneficial to turn compression on to prevent network saturation.
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge1/expected.json
@@ -3,6 +3,6 @@
     "pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
   },
   {
-    "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+    "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
   }
 ]
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge2/expected.json
@@ -3,6 +3,6 @@
     "pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
   },
   {
-    "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+    "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
   }
 ]
\ No newline at end of file
diff --git a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
index ef85cefb9c..244a68e8c0 100644
--- a/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
+++ b/minifi/minifi-integration-tests/src/test/resources/c2/hierarchical/minifi-edge3/expected.json
@@ -3,6 +3,6 @@
     "pattern": "ConfigurationChangeCoordinator Notifying Listeners of a change"
   },
   {
-    "pattern": "MiNiFi has finished reloading successfully and swap file exists. Deleting old configuration"
+    "pattern": "MiNiFi has finished reloading successfully and swap.yml file exists. Deleting old configuration"
   }
 ]
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
index 810c920f25..71b68792bb 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/pom.xml
@@ -26,6 +26,11 @@ limitations under the License.
     <artifactId>minifi-framework-core</artifactId>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi.minifi</groupId>
+            <artifactId>minifi-commons-api</artifactId>
+            <version>1.20.0-SNAPSHOT</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi.minifi</groupId>
             <artifactId>minifi-framework-api</artifactId>
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java
new file mode 100644
index 0000000000..755df830c2
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/MiNiFiProperties.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.BOOLEAN_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.LONG_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.NON_NEGATIVE_INTEGER_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.PORT_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.TIME_PERIOD_VALIDATOR;
+import static org.apache.nifi.minifi.MiNiFiProperties.ValidatorNames.VALID;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum MiNiFiProperties {
+    JAVA("java", "java", false, false, VALID),
+    RUN_AS("run.as", null, false, false, VALID),
+    LIB_DIR("lib.dir", "./lib", false, false, VALID),
+    CONF_DIR("conf.dir", "./conf", false, false, VALID),
+    GRACEFUL_SHUTDOWN_SECOND("graceful.shutdown.seconds", "20", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+    NIFI_MINIFI_CONFIG("nifi.minifi.config", "./conf/config.yml", false, true, VALID),
+    NIFI_MINIFI_SECURITY_KEYSTORE("nifi.minifi.security.keystore", null, false, false, VALID),
+    NIFI_MINIFI_SECURITY_KEYSTORE_TYPE("nifi.minifi.security.keystoreType", null, false, false, VALID),
+    NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD("nifi.minifi.security.keystorePasswd", null, true, false, VALID),
+    NIFI_MINIFI_SECURITY_KEY_PASSWD("nifi.minifi.security.keyPasswd", null, true, false, VALID),
+    NIFI_MINIFI_SECURITY_TRUSTSTORE("nifi.minifi.security.truststore", null, false, false, VALID),
+    NIFI_MINIFI_SECURITY_TRUSTSTORE_TYPE("nifi.minifi.security.truststoreType", null, false, false, VALID),
+    NIFI_MINIFI_SECURITY_TRUSTSTORE_PASSWD("nifi.minifi.security.truststorePasswd", null, true, false, VALID),
+    NIFI_MINIFI_SECURITY_SSL_PROTOCOL("nifi.minifi.security.ssl.protocol", null, false, false, VALID),
+    NIFI_MINIFI_SENSITIVE_PROPS_KEY("nifi.minifi.sensitive.props.key", null, true, false, VALID),
+    NIFI_MINIFI_SENSITIVE_PROPS_ALGORITHM("nifi.minifi.sensitive.props.algorithm", null, true, false, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_COMMENT("nifi.minifi.provenance.reporting.comment", null, false, true, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_STRATEGY("nifi.minifi.provenance.reporting.scheduling.strategy", null, false, true, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_SCHEDULING_PERIOD("nifi.minifi.provenance.reporting.scheduling.period", null, false, true, TIME_PERIOD_VALIDATOR),
+    NIFI_MINIFI_PROVENANCE_REPORTING_DESTINATION_URL("nifi.minifi.provenance.reporting.destination.url", null, false, true, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_INPUT_PORT_NAME("nifi.minifi.provenance.reporting.input.port.name", null, false, true, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_INSTANCE_URL("nifi.minifi.provenance.reporting.instance.url", null, false, true, VALID),
+    NIFI_MINIFI_PROVENANCE_REPORTING_BATCH_SIZE("nifi.minifi.provenance.reporting.batch.size", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+    NIFI_MINIFI_PROVENANCE_REPORTING_COMMUNICATIONS_TIMEOUT("nifi.minifi.provenance.reporting.communications.timeout", null, false, true, TIME_PERIOD_VALIDATOR),
+    NIFI_MINIFI_FLOW_USE_PARENT_SSL("nifi.minifi.flow.use.parent.ssl", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS("nifi.minifi.notifier.ingestors", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_CONFIG_PATH("nifi.minifi.notifier.ingestors.file.config.path", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_FILE_POLLING_PERIOD_SECONDS("nifi.minifi.notifier.ingestors.file.polling.period.seconds", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_RECEIVE_HTTP_PORT("nifi.minifi.notifier.ingestors.receive.http.port", null, false, true, PORT_VALIDATOR),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_HOSTNAME("nifi.minifi.notifier.ingestors.pull.http.hostname", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PORT("nifi.minifi.notifier.ingestors.pull.http.port", null, false, true, PORT_VALIDATOR),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PATH("nifi.minifi.notifier.ingestors.pull.http.path", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_QUERY("nifi.minifi.notifier.ingestors.pull.http.query", null, false, true, VALID),
+    NIFI_MINIFI_NOTIFIER_INGESTORS_PULL_HTTP_PERIOD_MS("nifi.minifi.notifier.ingestors.pull.http.period.ms", null, false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+    NIFI_MINIFI_STATUS_REPORTER_COMPONENTS("nifi.minifi.status.reporter.components", null, false, true, VALID),
+    NIFI_MINIFI_STATUS_REPORTER_LOG_QUERY("nifi.minifi.status.reporter.log.query", null, false, true, VALID),
+    NIFI_MINIFI_STATUS_REPORTER_LOG_LEVEL("nifi.minifi.status.reporter.log.level", null, false, true, VALID),
+    NIFI_MINIFI_STATUS_REPORTER_LOG_PERIOD("nifi.minifi.status.reporter.log.period", null, false, true, VALID),
+    JAVA_ARG_1("java.arg.1", null, false, true, VALID),
+    JAVA_ARG_2("java.arg.2", null, false, true, VALID),
+    JAVA_ARG_3("java.arg.3", null, false, true, VALID),
+    JAVA_ARG_4("java.arg.4", null, false, true, VALID),
+    JAVA_ARG_5("java.arg.5", null, false, true, VALID),
+    JAVA_ARG_6("java.arg.6", null, false, true, VALID),
+    JAVA_ARG_7("java.arg.7", null, false, true, VALID),
+    JAVA_ARG_8("java.arg.8", null, false, true, VALID),
+    JAVA_ARG_9("java.arg.9", null, false, true, VALID),
+    JAVA_ARG_10("java.arg.10", null, false, true, VALID),
+    JAVA_ARG_11("java.arg.11", null, false, true, VALID),
+    JAVA_ARG_12("java.arg.12", null, false, true, VALID),
+    JAVA_ARG_13("java.arg.13", null, false, true, VALID),
+    JAVA_ARG_14("java.arg.14", null, false, true, VALID),
+    C2_ENABLE("c2.enable", "false", false, true, BOOLEAN_VALIDATOR),
+    C2_REST_URL("c2.rest.url", "", false, true, VALID),
+    C2_REST_URL_ACK("c2.rest.url.ack", "", false, true, VALID),
+    C2_REST_CONNECTION_TIMEOUT("c2.rest.connectionTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
+    C2_REST_READ_TIMEOUT("c2.rest.readTimeout", "5 sec", false, true, TIME_PERIOD_VALIDATOR),
+    C2_REST_CALL_TIMEOUT("c2.rest.callTimeout", "10 sec", false, true, TIME_PERIOD_VALIDATOR),
+    C2_MAX_IDLE_CONNECTIONS("c2.rest.maxIdleConnections", "5", false, true, NON_NEGATIVE_INTEGER_VALIDATOR),
+    C2_KEEP_ALIVE_DURATION("c2.rest.keepAliveDuration", "5 min", false, true, TIME_PERIOD_VALIDATOR),
+    C2_AGENT_HEARTBEAT_PERIOD("c2.agent.heartbeat.period", "1000", false, true, LONG_VALIDATOR),
+    C2_AGENT_CLASS("c2.agent.class", "", false, true, VALID),
+    C2_CONFIG_DIRECTORY("c2.config.directory", "./conf", false, true, VALID),
+    C2_RUNTIME_MANIFEST_IDENTIFIER("c2.runtime.manifest.identifier", "", false, true, VALID),
+    C2_RUNTIME_TYPE("c2.runtime.type", "", false, true, VALID),
+    C2_AGENT_IDENTIFIER("c2.agent.identifier", null, false, true, VALID),
+    C2_FULL_HEARTBEAT("c2.full.heartbeat", "true", false, true, BOOLEAN_VALIDATOR),
+    C2_SECURITY_TRUSTSTORE_LOCATION("c2.security.truststore.location", "", false, false, VALID),
+    C2_SECURITY_TRUSTSTORE_PASSWORD("c2.security.truststore.password", "", true, false, VALID),
+    C2_SECURITY_TRUSTSTORE_TYPE("c2.security.truststore.type", "JKS", false, false, VALID),
+    C2_SECURITY_KEYSTORE_LOCATION("c2.security.keystore.location", "", false, false, VALID),
+    C2_SECURITY_KEYSTORE_PASSWORD("c2.security.keystore.password", "", true, false, VALID),
+    C2_SECURITY_KEYSTORE_TYPE("c2.security.keystore.type", "JKS", false, false, VALID),
+    C2_REQUEST_COMPRESSION("c2.request.compression", "none", false, true, VALID),
+    C2_ASSET_DIRECTORY("c2.asset.directory", "./asset", false, true, VALID);
+
+    public static final LinkedHashMap<String, MiNiFiProperties> PROPERTIES_BY_KEY = Arrays.stream(MiNiFiProperties.values())
+        .sorted()
+        .collect(Collectors.toMap(MiNiFiProperties::getKey, Function.identity(), (x, y) -> y, LinkedHashMap::new));
+
+    private final String key;
+    private final String defaultValue;
+    private final boolean sensitive;
+    private final boolean modifiable;
+    private final ValidatorNames validator;
+
+    MiNiFiProperties(String key, String defaultValue, boolean sensitive, boolean modifiable, ValidatorNames validator) {
+        this.key = key;
+        this.defaultValue = defaultValue;
+        this.sensitive = sensitive;
+        this.modifiable = modifiable;
+        this.validator = validator;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public boolean isSensitive() {
+        return sensitive;
+    }
+
+    public boolean isModifiable() {
+        return modifiable;
+    }
+
+    public ValidatorNames getValidator() {
+        return validator;
+    }
+
+    public enum ValidatorNames {
+        VALID, BOOLEAN_VALIDATOR, LONG_VALIDATOR, NON_NEGATIVE_INTEGER_VALIDATOR, TIME_PERIOD_VALIDATOR, PORT_VALIDATOR
+    }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
deleted file mode 100644
index 71282c0ed5..0000000000
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NiFiProperties.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.c2;
-
-import java.util.concurrent.TimeUnit;
-
-public class C2NiFiProperties {
-
-    public static final String C2_PREFIX = "c2.";
-
-    public static final String C2_ENABLE_KEY = C2_PREFIX + "enable";
-    public static final String C2_AGENT_PROTOCOL_KEY = C2_PREFIX + "agent.protocol.class";
-    public static final String C2_COAP_HOST_KEY = C2_PREFIX + "agent.coap.host";
-    public static final String C2_COAP_PORT_KEY = C2_PREFIX + "agent.coap.port";
-    public static final String C2_CONFIG_DIRECTORY_KEY = C2_PREFIX + "config.directory";
-    public static final String C2_RUNTIME_MANIFEST_IDENTIFIER_KEY = C2_PREFIX + "runtime.manifest.identifier";
-    public static final String C2_RUNTIME_TYPE_KEY = C2_PREFIX + "runtime.type";
-    public static final String C2_REST_URL_KEY = C2_PREFIX + "rest.url";
-    public static final String C2_REST_URL_ACK_KEY = C2_PREFIX + "rest.url.ack";
-    public static final String C2_ROOT_CLASSES_KEY = C2_PREFIX + "root.classes";
-    public static final String C2_AGENT_HEARTBEAT_PERIOD_KEY = C2_PREFIX + "agent.heartbeat.period";
-    public static final String C2_CONNECTION_TIMEOUT = C2_PREFIX + "rest.connectionTimeout";
-    public static final String C2_READ_TIMEOUT = C2_PREFIX + "rest.readTimeout";
-    public static final String C2_CALL_TIMEOUT = C2_PREFIX + "rest.callTimeout";
-    public static final String C2_AGENT_CLASS_KEY = C2_PREFIX + "agent.class";
-    public static final String C2_AGENT_IDENTIFIER_KEY = C2_PREFIX + "agent.identifier";
-    public static final String C2_FULL_HEARTBEAT_KEY = C2_PREFIX + "full.heartbeat";
-    public static final String C2_REQUEST_COMPRESSION_KEY = C2_PREFIX + "request.compression";
-    public static final String C2_ASSET_DIRECTORY_KEY = C2_PREFIX + "asset.directory";
-
-    public static final String C2_ROOT_CLASS_DEFINITIONS_KEY = C2_PREFIX + "root.class.definitions";
-    public static final String C2_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.name";
-    public static final String C2_METRICS_METRICS_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics";
-    public static final String C2_METRICS_METRICS_TYPED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.name";
-    public static final String C2_METRICS_METRICS_QUEUED_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.name";
-    public static final String C2_METRICS_METRICS_QUEUE_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.queuemetrics.classes";
-    public static final String C2_METRICS_METRICS_TYPED_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.typedmetrics.classes";
-    public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_NAME_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.name";
-    public static final String C2_METRICS_METRICS_PROCESSOR_METRICS_CLASSES_KEY = C2_ROOT_CLASS_DEFINITIONS_KEY + ".metrics.metrics.processorMetrics.classes";
-
-    /* C2 Client Security Properties */
-    private static final String C2_REST_SECURITY_BASE_KEY = C2_PREFIX + "security";
-    public static final String TRUSTSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.location";
-    public static final String TRUSTSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.password";
-    public static final String TRUSTSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".truststore.type";
-    public static final String KEYSTORE_LOCATION_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.location";
-    public static final String KEYSTORE_PASSWORD_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.password";
-    public static final String KEYSTORE_TYPE_KEY = C2_REST_SECURITY_BASE_KEY + ".keystore.type";
-
-    // Defaults
-    // Heartbeat period of 1 second
-    public static final long C2_AGENT_DEFAULT_HEARTBEAT_PERIOD = TimeUnit.SECONDS.toMillis(1);
-
-    // Connection timeout of 5 seconds
-    public static final String C2_DEFAULT_CONNECTION_TIMEOUT = "5 sec";
-    // Read timeout of 5 seconds
-    public static final String C2_DEFAULT_READ_TIMEOUT = "5 sec";
-    // Call timeout of 10 seconds
-    public static final String C2_DEFAULT_CALL_TIMEOUT = "10 sec";
-
-    // C2 request compression is turned off by default
-    public static final String C2_REQUEST_COMPRESSION= "none";
-
-    public static final String C2_ASSET_DIRECTORY = "./asset";
-}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
index e280386e90..d56e0d7ce3 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/C2NifiClientService.java
@@ -18,19 +18,48 @@
 package org.apache.nifi.minifi.c2;
 
 import static java.util.Optional.ofNullable;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_CLASS;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_HEARTBEAT_PERIOD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_AGENT_IDENTIFIER;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ASSET_DIRECTORY;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_CONFIG_DIRECTORY;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_FULL_HEARTBEAT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_KEEP_ALIVE_DURATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_MAX_IDLE_CONNECTIONS;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REQUEST_COMPRESSION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CALL_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_CONNECTION_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_READ_TIMEOUT;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_REST_URL_ACK;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_RUNTIME_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_LOCATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_PASSWORD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_KEYSTORE_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_LOCATION;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_PASSWORD;
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_SECURITY_TRUSTSTORE_TYPE;
+import static org.apache.nifi.minifi.MiNiFiProperties.CONF_DIR;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.CONFIG_UPDATED_FILE_NAME;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import org.apache.nifi.bootstrap.BootstrapCommunicator;
 import org.apache.nifi.c2.client.C2ClientConfig;
 import org.apache.nifi.c2.client.http.C2HttpClient;
 import org.apache.nifi.c2.client.service.C2ClientService;
@@ -38,19 +67,24 @@ import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
 import org.apache.nifi.c2.client.service.FlowIdHolder;
 import org.apache.nifi.c2.client.service.ManifestHashProvider;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
-import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.C2OperationHandlerProvider;
 import org.apache.nifi.c2.client.service.operation.DescribeManifestOperationHandler;
 import org.apache.nifi.c2.client.service.operation.EmptyOperandPropertiesProvider;
 import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
 import org.apache.nifi.c2.client.service.operation.SupportedOperationsProvider;
 import org.apache.nifi.c2.client.service.operation.TransferDebugOperationHandler;
 import org.apache.nifi.c2.client.service.operation.UpdateAssetOperationHandler;
 import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
-import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
-import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
+import org.apache.nifi.c2.client.service.operation.UpdatePropertiesOperationHandler;
 import org.apache.nifi.c2.protocol.api.AgentManifest;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
 import org.apache.nifi.c2.serializer.C2JacksonSerializer;
 import org.apache.nifi.controller.FlowController;
@@ -62,6 +96,11 @@ import org.apache.nifi.extension.manifest.parser.ExtensionManifestParser;
 import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParser;
 import org.apache.nifi.manifest.RuntimeManifestService;
 import org.apache.nifi.manifest.StandardRuntimeManifestService;
+import org.apache.nifi.minifi.c2.command.PropertiesPersister;
+import org.apache.nifi.minifi.c2.command.TransferDebugCommandHelper;
+import org.apache.nifi.minifi.c2.command.UpdateAssetCommandHelper;
+import org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider;
+import org.apache.nifi.minifi.commons.api.MiNiFiCommandState;
 import org.apache.nifi.nar.ExtensionManagerHolder;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -70,28 +109,39 @@ import org.slf4j.LoggerFactory;
 
 public class C2NifiClientService {
 
-    private static final Logger logger = LoggerFactory.getLogger(C2NifiClientService.class);
-
-    private static final String DEFAULT_CONF_DIR = "./conf";
-    private static final String TARGET_CONFIG_FILE = "/config-new.yml";
+    private static final Logger LOGGER = LoggerFactory.getLogger(C2NifiClientService.class);
+    private static final String TARGET_CONFIG_FILE = "/" + CONFIG_UPDATED_FILE_NAME;
     private static final String ROOT_GROUP_ID = "root";
     private static final Long INITIAL_DELAY = 10000L;
     private static final Integer TERMINATION_WAIT = 5000;
+    private static final int MINIFI_RESTART_TIMEOUT_SECONDS = 60;
+    private static final String ACKNOWLEDGE_OPERATION = "ACKNOWLEDGE_OPERATION";
+    private static final int IS_ACK_RECEIVED_POLL_INTERVAL = 1000;
+    private static final Map<MiNiFiCommandState, OperationState> OPERATION_STATE_MAP = getOperationStateMap();
+    private static final int MAX_WAIT_FOR_BOOTSTRAP_ACK_MS = 20000;
 
     private final C2ClientService c2ClientService;
 
     private final FlowController flowController;
     private final String propertiesDir;
-    private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+    private final ScheduledThreadPoolExecutor heartbeatExecutorService = new ScheduledThreadPoolExecutor(1);
+    private final ScheduledThreadPoolExecutor bootstrapAcknowledgeExecutorService = new ScheduledThreadPoolExecutor(1);
     private final ExtensionManifestParser extensionManifestParser = new JAXBExtensionManifestParser();
 
     private final RuntimeManifestService runtimeManifestService;
 
     private final SupportedOperationsProvider supportedOperationsProvider;
+    private final RequestedOperationDAO requestedOperationDAO;
+    private final BootstrapCommunicator bootstrapCommunicator;
+    private volatile boolean ackReceived = false;
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final PropertiesPersister propertiesPersister;
+    private final ObjectMapper objectMapper;
+
 
     private final long heartbeatPeriod;
 
-    public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
+    public C2NifiClientService(NiFiProperties niFiProperties, FlowController flowController, BootstrapCommunicator bootstrapCommunicator) {
         C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
         FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
         this.propertiesDir = niFiProperties.getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
@@ -109,58 +159,157 @@ public class C2NifiClientService {
         OperandPropertiesProvider emptyOperandPropertiesProvider = new EmptyOperandPropertiesProvider();
         TransferDebugCommandHelper transferDebugCommandHelper = new TransferDebugCommandHelper(niFiProperties);
         UpdateAssetCommandHelper updateAssetCommandHelper = new UpdateAssetCommandHelper(clientConfig.getC2AssetDirectory());
+        objectMapper = new ObjectMapper();
         updateAssetCommandHelper.createAssetDirectory();
-        C2OperationService c2OperationService = new C2OperationService(Arrays.asList(
+        this.bootstrapCommunicator = bootstrapCommunicator;
+        requestedOperationDAO = new FileBasedRequestedOperationDAO(niFiProperties.getProperty("org.apache.nifi.minifi.bootstrap.config.pid.dir", "bin"), objectMapper);
+        String bootstrapConfigFileLocation = niFiProperties.getProperty("nifi.minifi.bootstrap.file");
+        updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+        propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
+        C2OperationHandlerProvider c2OperationHandlerProvider = new C2OperationHandlerProvider(Arrays.asList(
             new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent, emptyOperandPropertiesProvider),
             new DescribeManifestOperationHandler(heartbeatFactory, this::generateRuntimeInfo, emptyOperandPropertiesProvider),
             TransferDebugOperationHandler.create(client, emptyOperandPropertiesProvider,
                 transferDebugCommandHelper.debugBundleFiles(), transferDebugCommandHelper::excludeSensitiveText),
             UpdateAssetOperationHandler.create(client, emptyOperandPropertiesProvider,
-                updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction)
+                updateAssetCommandHelper::assetUpdatePrecondition, updateAssetCommandHelper::assetPersistFunction),
+            new UpdatePropertiesOperationHandler(updatePropertiesPropertyProvider, propertiesPersister::persistProperties)
         ));
-        this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationService);
-        this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationService.getHandlers());
+        this.c2ClientService = new C2ClientService(client, heartbeatFactory, c2OperationHandlerProvider, requestedOperationDAO, this::registerOperation);
+        this.supportedOperationsProvider = new SupportedOperationsProvider(c2OperationHandlerProvider.getHandlers());
+        bootstrapCommunicator.registerMessageHandler(ACKNOWLEDGE_OPERATION, (params, output) -> acknowledgeHandler(params));
     }
 
     private C2ClientConfig generateClientConfig(NiFiProperties properties) {
         return new C2ClientConfig.Builder()
-            .agentClass(properties.getProperty(C2NiFiProperties.C2_AGENT_CLASS_KEY, ""))
-            .agentIdentifier(properties.getProperty(C2NiFiProperties.C2_AGENT_IDENTIFIER_KEY))
-            .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2NiFiProperties.C2_FULL_HEARTBEAT_KEY, "true")))
-            .heartbeatPeriod(Long.parseLong(properties.getProperty(C2NiFiProperties.C2_AGENT_HEARTBEAT_PERIOD_KEY,
-                String.valueOf(C2NiFiProperties.C2_AGENT_DEFAULT_HEARTBEAT_PERIOD))))
-            .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CONNECTION_TIMEOUT,
-                C2NiFiProperties.C2_DEFAULT_CONNECTION_TIMEOUT), TimeUnit.MILLISECONDS))
-            .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_READ_TIMEOUT,
-                C2NiFiProperties.C2_DEFAULT_READ_TIMEOUT), TimeUnit.MILLISECONDS))
-            .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2NiFiProperties.C2_CALL_TIMEOUT,
-                C2NiFiProperties.C2_DEFAULT_CALL_TIMEOUT), TimeUnit.MILLISECONDS))
-            .c2Url(properties.getProperty(C2NiFiProperties.C2_REST_URL_KEY, ""))
-            .c2RequestCompression(properties.getProperty(C2NiFiProperties.C2_REQUEST_COMPRESSION_KEY, C2NiFiProperties.C2_REQUEST_COMPRESSION))
-            .c2AssetDirectory(properties.getProperty(C2NiFiProperties.C2_ASSET_DIRECTORY_KEY, C2NiFiProperties.C2_ASSET_DIRECTORY))
-            .confDirectory(properties.getProperty(C2NiFiProperties.C2_CONFIG_DIRECTORY_KEY, DEFAULT_CONF_DIR))
-            .runtimeManifestIdentifier(properties.getProperty(C2NiFiProperties.C2_RUNTIME_MANIFEST_IDENTIFIER_KEY, ""))
-            .runtimeType(properties.getProperty(C2NiFiProperties.C2_RUNTIME_TYPE_KEY, ""))
-            .c2AckUrl(properties.getProperty(C2NiFiProperties.C2_REST_URL_ACK_KEY, ""))
-            .truststoreFilename(properties.getProperty(C2NiFiProperties.TRUSTSTORE_LOCATION_KEY, ""))
-            .truststorePassword(properties.getProperty(C2NiFiProperties.TRUSTSTORE_PASSWORD_KEY, ""))
-            .truststoreType(properties.getProperty(C2NiFiProperties.TRUSTSTORE_TYPE_KEY, "JKS"))
-            .keystoreFilename(properties.getProperty(C2NiFiProperties.KEYSTORE_LOCATION_KEY, ""))
-            .keystorePassword(properties.getProperty(C2NiFiProperties.KEYSTORE_PASSWORD_KEY, ""))
-            .keystoreType(properties.getProperty(C2NiFiProperties.KEYSTORE_TYPE_KEY, "JKS"))
+            .agentClass(properties.getProperty(C2_AGENT_CLASS.getKey(), C2_AGENT_CLASS.getDefaultValue()))
+            .agentIdentifier(properties.getProperty(C2_AGENT_IDENTIFIER.getKey()))
+            .fullHeartbeat(Boolean.parseBoolean(properties.getProperty(C2_FULL_HEARTBEAT.getKey(), C2_FULL_HEARTBEAT.getDefaultValue())))
+            .heartbeatPeriod(Long.parseLong(properties.getProperty(C2_AGENT_HEARTBEAT_PERIOD.getKey(),
+                C2_AGENT_HEARTBEAT_PERIOD.getDefaultValue())))
+            .connectTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CONNECTION_TIMEOUT.getKey(),
+                C2_REST_CONNECTION_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+            .readTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_READ_TIMEOUT.getKey(),
+                C2_REST_READ_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+            .callTimeout((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_REST_CALL_TIMEOUT.getKey(),
+                C2_REST_CALL_TIMEOUT.getDefaultValue()), TimeUnit.MILLISECONDS))
+            .maxIdleConnections(Integer.parseInt(properties.getProperty(C2_MAX_IDLE_CONNECTIONS.getKey(), C2_MAX_IDLE_CONNECTIONS.getDefaultValue())))
+            .keepAliveDuration((long) FormatUtils.getPreciseTimeDuration(properties.getProperty(C2_KEEP_ALIVE_DURATION.getKey(),
+                C2_KEEP_ALIVE_DURATION.getDefaultValue()), TimeUnit.MILLISECONDS))
+            .c2Url(properties.getProperty(C2_REST_URL.getKey(), C2_REST_URL.getDefaultValue()))
+            .c2RequestCompression(properties.getProperty(C2_REQUEST_COMPRESSION.getKey(), C2_REQUEST_COMPRESSION.getDefaultValue()))
+            .c2AssetDirectory(properties.getProperty(C2_ASSET_DIRECTORY.getKey(), C2_ASSET_DIRECTORY.getDefaultValue()))
+            .confDirectory(properties.getProperty(C2_CONFIG_DIRECTORY.getKey(), C2_CONFIG_DIRECTORY.getDefaultValue()))
+            .runtimeManifestIdentifier(properties.getProperty(C2_RUNTIME_MANIFEST_IDENTIFIER.getKey(), C2_RUNTIME_MANIFEST_IDENTIFIER.getDefaultValue()))
+            .runtimeType(properties.getProperty(C2_RUNTIME_TYPE.getKey(), C2_RUNTIME_TYPE.getDefaultValue()))
+            .c2AckUrl(properties.getProperty(C2_REST_URL_ACK.getKey(), C2_REST_URL_ACK.getDefaultValue()))
+            .truststoreFilename(properties.getProperty(C2_SECURITY_TRUSTSTORE_LOCATION.getKey(), C2_SECURITY_TRUSTSTORE_LOCATION.getDefaultValue()))
+            .truststorePassword(properties.getProperty(C2_SECURITY_TRUSTSTORE_PASSWORD.getKey(), C2_SECURITY_TRUSTSTORE_PASSWORD.getDefaultValue()))
+            .truststoreType(properties.getProperty(C2_SECURITY_TRUSTSTORE_TYPE.getKey(), C2_SECURITY_TRUSTSTORE_TYPE.getDefaultValue()))
+            .keystoreFilename(properties.getProperty(C2_SECURITY_KEYSTORE_LOCATION.getKey(), C2_SECURITY_KEYSTORE_LOCATION.getDefaultValue()))
+            .keystorePassword(properties.getProperty(C2_SECURITY_KEYSTORE_PASSWORD.getKey(), C2_SECURITY_KEYSTORE_PASSWORD.getDefaultValue()))
+            .keystoreType(properties.getProperty(C2_SECURITY_KEYSTORE_TYPE.getKey(), C2_SECURITY_KEYSTORE_TYPE.getDefaultValue()))
             .build();
     }
 
     public void start() {
-        scheduledExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+        handleOngoingOperations();
+        heartbeatExecutorService.scheduleAtFixedRate(() -> c2ClientService.sendHeartbeat(generateRuntimeInfo()), INITIAL_DELAY, heartbeatPeriod, TimeUnit.MILLISECONDS);
+    }
+
+    // need to be synchronized to prevent parallel run coming from acknowledgeHandler/ackTimeoutTask
+    private synchronized void handleOngoingOperations() {
+        Optional<OperationQueue> operationQueue = requestedOperationDAO.load();
+        LOGGER.info("Handling ongoing operations: {}", operationQueue);
+        if (operationQueue.isPresent()) {
+            try {
+                waitForAcknowledgeFromBootstrap();
+                c2ClientService.handleRequestedOperations(operationQueue.get().getRemainingOperations());
+            } catch (Exception e) {
+                LOGGER.error("Failed to process c2 operations queue", e);
+                c2ClientService.enableHeartbeat();
+            }
+        } else {
+            c2ClientService.enableHeartbeat();
+        }
+    }
+
+    private void waitForAcknowledgeFromBootstrap() {
+        LOGGER.info("Waiting for ACK signal from Bootstrap");
+        int currentWaitTime = 0;
+        while(!ackReceived) {
+            try {
+                Thread.sleep(IS_ACK_RECEIVED_POLL_INTERVAL);
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread interrupted while waiting for Acknowledge");
+            }
+            currentWaitTime += IS_ACK_RECEIVED_POLL_INTERVAL;
+            if (MAX_WAIT_FOR_BOOTSTRAP_ACK_MS <= currentWaitTime) {
+                LOGGER.warn("Max wait time ({}) exceeded for waiting ack from bootstrap, skipping", MAX_WAIT_FOR_BOOTSTRAP_ACK_MS);
+                break;
+            }
+        }
+    }
+
+    private void registerOperation(C2Operation c2Operation) {
+        try {
+            ackReceived = false;
+            registerAcknowledgeTimeoutTask(c2Operation);
+            String command = Optional.ofNullable(c2Operation.getOperand())
+                .map(operand -> c2Operation.getOperation().name() + "_" + operand.name())
+                .orElse(c2Operation.getOperation().name());
+            bootstrapCommunicator.sendCommand(command);
+        } catch (IOException e) {
+            LOGGER.error("Failed to send operation to bootstrap", e);
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private void registerAcknowledgeTimeoutTask(C2Operation c2Operation) {
+        bootstrapAcknowledgeExecutorService.schedule(() -> {
+            if (!ackReceived) {
+                LOGGER.info("Operation requiring restart is failed, and no restart/acknowledge is happened after {} seconds for {}. Handling remaining operations.",
+                    MINIFI_RESTART_TIMEOUT_SECONDS, c2Operation);
+                handleOngoingOperations();
+            }
+        }, MINIFI_RESTART_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+    }
+
+    private void acknowledgeHandler(String[] params) {
+        LOGGER.info("Received acknowledge message from bootstrap process");
+        if (params.length < 1) {
+            LOGGER.error("Invalid arguments coming from bootstrap, skipping acknowledging latest operation");
+            return;
+        }
+
+        Optional<OperationQueue> optionalOperationQueue = requestedOperationDAO.load();
+        ackReceived = true;
+        if (optionalOperationQueue.isPresent()) {
+            OperationQueue operationQueue = optionalOperationQueue.get();
+            C2Operation c2Operation = operationQueue.getCurrentOperation();
+            C2OperationAck c2OperationAck = new C2OperationAck();
+            c2OperationAck.setOperationId(c2Operation.getIdentifier());
+            C2OperationState c2OperationState = new C2OperationState();
+            MiNiFiCommandState miNiFiCommandState = MiNiFiCommandState.valueOf(params[0]);
+            OperationState state = OPERATION_STATE_MAP.get(miNiFiCommandState);
+            c2OperationState.setState(state);
+            c2OperationAck.setOperationState(c2OperationState);
+            c2ClientService.sendAcknowledge(c2OperationAck);
+            if (MiNiFiCommandState.NO_OPERATION == miNiFiCommandState || MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART == miNiFiCommandState) {
+                LOGGER.debug("No restart happened because of an error / the app was already in the desired state");
+                handleOngoingOperations();
+            }
+        } else {
+            LOGGER.error("Can not send acknowledge due to empty Operation Queue");
+        }
     }
 
     public void stop() {
         try {
-            scheduledExecutorService.shutdown();
-            scheduledExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS);
+            heartbeatExecutorService.shutdown();
+            heartbeatExecutorService.awaitTermination(TERMINATION_WAIT, TimeUnit.MILLISECONDS);
         } catch (InterruptedException ignore) {
-            logger.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService");
+            LOGGER.info("Stopping C2 Client's thread was interrupted but shutting down anyway the C2NifiClientService");
         }
     }
 
@@ -215,14 +364,16 @@ public class C2NifiClientService {
     }
 
     private boolean updateFlowContent(byte[] updateContent) {
-        logger.debug("Update content: \n{}", new String(updateContent, StandardCharsets.UTF_8));
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Update content: \n{}", new String(updateContent, StandardCharsets.UTF_8));
+        }
         Path path = getTargetConfigFile().toPath();
         try {
             Files.write(getTargetConfigFile().toPath(), updateContent);
-            logger.info("Updated configuration was written to: {}", path);
+            LOGGER.info("Updated configuration was written to: {}", path);
             return true;
         } catch (IOException e) {
-            logger.error("Configuration update failed. File creation was not successful targeting: {}", path, e);
+            LOGGER.error("Configuration update failed. File creation was not successful targeting: {}", path, e);
             return false;
         }
     }
@@ -232,6 +383,15 @@ public class C2NifiClientService {
             .map(File::new)
             .map(File::getParent)
             .map(parentDir -> new File(parentDir + TARGET_CONFIG_FILE))
-            .orElse(new File(DEFAULT_CONF_DIR + TARGET_CONFIG_FILE));
+            .orElse(new File(CONF_DIR.getDefaultValue() + TARGET_CONFIG_FILE));
+    }
+
+    private static Map<MiNiFiCommandState, OperationState> getOperationStateMap() {
+        Map<MiNiFiCommandState, OperationState> operationStateMapping = new HashMap<>();
+        operationStateMapping.put(MiNiFiCommandState.FULLY_APPLIED, OperationState.FULLY_APPLIED);
+        operationStateMapping.put(MiNiFiCommandState.NO_OPERATION, OperationState.NO_OPERATION);
+        operationStateMapping.put(MiNiFiCommandState.NOT_APPLIED_WITH_RESTART, OperationState.NOT_APPLIED);
+        operationStateMapping.put(MiNiFiCommandState.NOT_APPLIED_WITHOUT_RESTART, OperationState.NOT_APPLIED);
+        return Collections.unmodifiableMap(operationStateMapping);
     }
 }
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
new file mode 100644
index 0000000000..80b1418c95
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAO.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.client.service.operation.RequestedOperationDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileBasedRequestedOperationDAO implements RequestedOperationDAO {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedRequestedOperationDAO.class);
+    protected static final String REQUESTED_OPERATIONS_FILE_NAME = "requestedOperations.data";
+
+    private final ObjectMapper objectMapper;
+    private final File requestedOperationsFile;
+
+    public FileBasedRequestedOperationDAO(String runDir, ObjectMapper objectMapper) {
+        this.requestedOperationsFile = new File(runDir, REQUESTED_OPERATIONS_FILE_NAME);
+        this.objectMapper = objectMapper;
+    }
+
+    public void save(OperationQueue operationQueue) {
+        LOGGER.info("Saving C2 operations to file");
+        LOGGER.debug("C2 Operation Queue: {}", operationQueue);
+        try {
+            objectMapper.writeValue(requestedOperationsFile, operationQueue);
+        } catch (Exception e) {
+            LOGGER.error("Failed to save requested c2 operations", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Optional<OperationQueue> load() {
+        LOGGER.info("Reading queued c2 operations from file");
+        if (requestedOperationsFile.exists()) {
+            try {
+                OperationQueue operationQueue = objectMapper.readValue(requestedOperationsFile, OperationQueue.class);
+                LOGGER.debug("Queued operations: {}", operationQueue);
+                return Optional.of(operationQueue);
+            } catch (Exception e) {
+                LOGGER.error("Failed to read queued operations file", e);
+            }
+        } else {
+            LOGGER.info("There is no queued c2 operation");
+        }
+        return Optional.empty();
+    }
+
+    public void cleanup() {
+        if (requestedOperationsFile.exists() && !requestedOperationsFile.delete()) {
+            LOGGER.error("Failed to delete requested operations file {}, it should be deleted manually", requestedOperationsFile);
+        }
+    }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java
new file mode 100644
index 0000000000..173e3fc52d
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/AgentPropertyValidationContext.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.expression.ExpressionLanguageCompiler;
+
+public class AgentPropertyValidationContext implements ValidationContext {
+
+        @Override
+        public boolean isExpressionLanguageSupported(String propertyName) {
+            return false;
+        }
+
+        @Override
+        public Map<PropertyDescriptor, String> getProperties() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Map<String, String> getAllProperties() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ControllerServiceLookup getControllerServiceLookup() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ValidationContext getControllerServiceValidationContext(ControllerService controllerService) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public PropertyValue newPropertyValue(String value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getAnnotationData() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isValidationRequired(ControllerService service) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isExpressionLanguagePresent(String value) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getProcessGroupIdentifier() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public Collection<String> getReferencedParameters(String propertyName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isParameterDefined(String parameterName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isParameterSet(String parameterName) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isDependencySatisfied(PropertyDescriptor propertyDescriptor, Function<String, PropertyDescriptor> propertyDescriptorLookup) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public PropertyValue getProperty(PropertyDescriptor descriptor) {
+            throw new UnsupportedOperationException();
+        }
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java
new file mode 100644
index 0000000000..3e68572b9b
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/PropertiesPersister.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PropertiesPersister {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesPersister.class);
+    private static final String VALID = "VALID";
+    private static final String EQUALS_SIGN = "=";
+    private static final String HASHMARK_SIGN = "#";
+
+    private final UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private final AgentPropertyValidationContext validationContext;
+    private final File bootstrapFile;
+    private final File bootstrapNewFile;
+
+    public PropertiesPersister(UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider, String bootstrapConfigFileLocation) {
+        this.updatePropertiesPropertyProvider = updatePropertiesPropertyProvider;
+        this.validationContext = new AgentPropertyValidationContext();
+        this.bootstrapFile = new File(bootstrapConfigFileLocation);
+        this.bootstrapNewFile = new File(bootstrapFile.getParentFile() + "/" + BOOTSTRAP_UPDATED_FILE_NAME);
+    }
+
+    public Boolean persistProperties(Map<String, String> propertiesToUpdate) {
+        int propertyCountToUpdate = validateProperties(propertiesToUpdate);
+        if (propertyCountToUpdate == 0) {
+            return false;
+        }
+        Set<String> propertiesToUpdateKeys = new HashSet<>(propertiesToUpdate.keySet());
+
+        Set<String> updatedProperties = new HashSet<>();
+        try (BufferedReader reader = new BufferedReader(new FileReader(bootstrapFile));
+            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(bootstrapNewFile, false))) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                for (String key : propertiesToUpdateKeys) {
+                    String prefix = key + EQUALS_SIGN;
+                    if (line.startsWith(prefix) || line.startsWith(HASHMARK_SIGN + prefix)) {
+                        line = prefix + propertiesToUpdate.get(key);
+                        updatedProperties.add(key);
+                    }
+                }
+                bufferedWriter.write(line + System.lineSeparator());
+            }
+
+            // add new properties which has no values before
+            propertiesToUpdateKeys.removeAll(updatedProperties);
+            for (String key : propertiesToUpdateKeys) {
+                bufferedWriter.write(key + EQUALS_SIGN + propertiesToUpdate.get(key) + System.lineSeparator());
+            }
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+
+        return true;
+    }
+
+    private int validateProperties(Map<String, String> propertiesToUpdate) {
+        Set<UpdatableProperty> updatableProperties = (Set<UpdatableProperty>) updatePropertiesPropertyProvider.getProperties().get(AVAILABLE_PROPERTIES);
+        Map<String, UpdatableProperty> updatablePropertyMap = updatableProperties.stream().collect(Collectors.toMap(UpdatableProperty::getPropertyName, Function.identity()));
+        int propertyCountToUpdate = 0;
+        List<String> validationErrors = new ArrayList<>();
+        for (Map.Entry<String, String> entry : propertiesToUpdate.entrySet()) {
+            UpdatableProperty updatableProperty = updatablePropertyMap.get(entry.getKey());
+            if (updatableProperty == null) {
+                validationErrors.add(String.format("You can not update the {} property through C2 protocol", entry.getKey()));
+                continue;
+            }
+            if (!Objects.equals(updatableProperty.getPropertyValue(), entry.getValue())) {
+                if (!getValidator(updatableProperty.getValidator())
+                    .map(validator -> validator.validate(entry.getKey(), entry.getValue(), validationContext))
+                    .map(ValidationResult::isValid)
+                    .orElse(true)) {
+                    validationErrors.add(String.format("Invalid value for %s", entry.getKey()));
+                    continue;
+                }
+                propertyCountToUpdate++;
+            }
+        }
+        if (!validationErrors.isEmpty()) {
+            throw new IllegalArgumentException("The following validation errors happened during property update:\\n" + String.join("\\n", validationErrors));
+        }
+        return propertyCountToUpdate;
+    }
+
+    private Optional<Validator> getValidator(String validatorName) {
+        try {
+            Field validatorField = StandardValidators.class.getField(validatorName);
+            return Optional.of((Validator) validatorField.get(null));
+        } catch (NoSuchFieldException e) {
+            if (!VALID.equals(validatorName)) {
+                LOGGER.warn("No validator present: {}", validatorName);
+            }
+        } catch (IllegalAccessException e) {
+            LOGGER.error("Illegal access of {}", validatorName);
+        }
+        return Optional.empty();
+    }
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java
new file mode 100644
index 0000000000..7850efb4db
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatableProperty.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class UpdatableProperty implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String propertyName;
+    private final String propertyValue;
+    private final String validator;
+
+    public UpdatableProperty(String propertyName, String propertyValue, String validator) {
+        this.propertyName = propertyName;
+        this.propertyValue = propertyValue;
+        this.validator = validator;
+    }
+
+    public String getPropertyName() {
+        return propertyName;
+    }
+
+    public String getPropertyValue() {
+        return propertyValue;
+    }
+
+    public String getValidator() {
+        return validator;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        UpdatableProperty that = (UpdatableProperty) o;
+        return Objects.equals(propertyName, that.propertyName) && Objects.equals(propertyValue, that.propertyValue) && Objects.equals(validator,
+            that.validator);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(propertyName, propertyValue, validator);
+    }
+
+    @Override
+    public String toString() {
+        return "UpdatableProperty{" +
+            "propertyName='" + propertyName + '\'' +
+            ", propertyValue='" + propertyValue + '\'' +
+            ", validator='" + validator + '\'' +
+            '}';
+    }
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java
new file mode 100644
index 0000000000..7faece8daf
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/main/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.c2.client.service.operation.OperandPropertiesProvider;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdatePropertiesPropertyProvider implements OperandPropertiesProvider {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePropertiesPropertyProvider.class);
+    protected static final String AVAILABLE_PROPERTIES = "availableProperties";
+
+    private final String bootstrapConfigFileLocation;
+
+    public UpdatePropertiesPropertyProvider(String bootstrapConfigFileLocation) {
+        this.bootstrapConfigFileLocation = bootstrapConfigFileLocation;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() {
+        Map<String, String> bootstrapProperties = getBootstrapProperties();
+
+        LinkedHashSet<UpdatableProperty> updatableProperties = PROPERTIES_BY_KEY.values()
+            .stream()
+            .filter(property -> !property.isSensitive())
+            .filter(MiNiFiProperties::isModifiable)
+            .map(property -> new UpdatableProperty(property.getKey(), bootstrapProperties.get(property.getKey()), property.getValidator().name()))
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+
+        return Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.unmodifiableSet(updatableProperties));
+    }
+
+    private Map<String, String> getBootstrapProperties() {
+        Properties props = new Properties();
+
+        File bootstrapFile = new File(bootstrapConfigFileLocation);
+        try (FileInputStream fis = new FileInputStream(bootstrapFile)) {
+            props.load(fis);
+        } catch (FileNotFoundException e) {
+            LOGGER.error("The bootstrap configuration file " + bootstrapConfigFileLocation + " doesn't exists", e);
+        } catch (IOException e) {
+            LOGGER.error("Failed to load properties from " + bootstrapConfigFileLocation, e);
+        }
+        return props.entrySet().stream()
+            .collect(Collectors.toMap(entry -> (String) entry.getKey(), entry -> (String) entry.getValue()));
+    }
+
+
+
+}
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
new file mode 100644
index 0000000000..962e98b2e9
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/FileBasedRequestedOperationDAOTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2;
+
+import static org.apache.nifi.minifi.c2.FileBasedRequestedOperationDAO.REQUESTED_OPERATIONS_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.client.service.operation.OperationQueue;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class FileBasedRequestedOperationDAOTest {
+
+    @Mock
+    private ObjectMapper objectMapper;
+
+    @TempDir
+    File tmpDir;
+
+    private FileBasedRequestedOperationDAO fileBasedRequestedOperationDAO;
+
+    @BeforeEach
+    void setup() {
+        fileBasedRequestedOperationDAO = new FileBasedRequestedOperationDAO(tmpDir.getAbsolutePath(), objectMapper);
+    }
+
+    @Test
+    void shouldSaveRequestedOperationsToFile() throws IOException {
+        OperationQueue operationQueue = getOperationQueue();
+        fileBasedRequestedOperationDAO.save(operationQueue);
+
+        verify(objectMapper).writeValue(any(File.class), eq(operationQueue));
+    }
+
+    @Test
+    void shouldThrowRuntimeExceptionWhenExceptionHappensDuringSave() throws IOException {
+        doThrow(new RuntimeException()).when(objectMapper).writeValue(any(File.class), anyList());
+
+        assertThrows(RuntimeException.class, () -> fileBasedRequestedOperationDAO.save(mock(OperationQueue.class)));
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenFileDoesntExists() {
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+    }
+
+    @Test
+    void shouldGetReturnEmptyWhenExceptionHappens() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        doThrow(new RuntimeException()).when(objectMapper).readValue(any(File.class), eq(OperationQueue.class));
+
+        assertEquals(Optional.empty(), fileBasedRequestedOperationDAO.load());
+    }
+
+    @Test
+    void shouldGetRequestedOperations() throws IOException {
+        new File(tmpDir.getAbsolutePath() + "/" + REQUESTED_OPERATIONS_FILE_NAME).createNewFile();
+
+        OperationQueue operationQueue = getOperationQueue();
+        when(objectMapper.readValue(any(File.class), eq(OperationQueue.class))).thenReturn(operationQueue);
+
+        assertEquals(Optional.of(operationQueue), fileBasedRequestedOperationDAO.load());
+    }
+
+    private OperationQueue getOperationQueue() {
+        C2Operation c2Operation = new C2Operation();
+        c2Operation.setIdentifier("id");
+        c2Operation.setOperation(OperationType.TRANSFER);
+        c2Operation.setOperand(OperandType.DEBUG);
+        c2Operation.setArgs(Collections.singletonMap("key", "value"));
+
+        C2Operation currentOperation = new C2Operation();
+        currentOperation.setIdentifier("id2");
+
+        return new OperationQueue(currentOperation, Collections.singletonList(c2Operation));
+    }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java
new file mode 100644
index 0000000000..e1d68df805
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/PropertiesPersisterTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.C2_ENABLE;
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.apache.nifi.minifi.commons.api.MiNiFiConstants.BOOTSTRAP_UPDATED_FILE_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PropertiesPersisterTest {
+
+    private static final String EXTRA_PROPERTY_KEY = "anExtraPropertyWhichShouldNotBeModified";
+    private static final String EXTRA_PROPERTY_VALUE = "propertyValue";
+    private static final String FALSE = "false";
+    private static final String TRUE = "true";
+    private static final String BOOLEAN_VALIDATOR = "BOOLEAN_VALIDATOR";
+    private static final String UNKNOWN = "unknown";
+    private static final String VALID = "VALID";
+
+    @Mock
+    private UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    @TempDir
+    private File tempDir;
+
+    private PropertiesPersister propertiesPersister;
+    private String bootstrapConfigFileLocation;
+    private String bootstrapNewConfigFileLocation;
+
+    @BeforeEach
+    void setup() {
+        bootstrapConfigFileLocation = tempDir.getAbsolutePath() + "/bootstrap.conf";
+        bootstrapNewConfigFileLocation = tempDir.getAbsolutePath() + "/" + BOOTSTRAP_UPDATED_FILE_NAME;
+        propertiesPersister = new PropertiesPersister(updatePropertiesPropertyProvider, bootstrapConfigFileLocation);
+    }
+
+    @Test
+    void shouldPersistPropertiesThrowIllegalArgumentExceptionIfParameterContainsUnknownProperty() {
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty("propertyName",
+            EXTRA_PROPERTY_VALUE, VALID))));
+        assertThrows(IllegalArgumentException.class, () -> propertiesPersister.persistProperties(Collections.singletonMap(UNKNOWN, UNKNOWN)));
+    }
+
+    @Test
+    void shouldPersistPropertiesThrowIllegalArgumentExceptionIfParameterContainsInvalidPropertyValue() {
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(), null,
+            BOOLEAN_VALIDATOR))));
+        assertThrows(IllegalArgumentException.class, () ->
+            propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), UNKNOWN)));
+    }
+
+    @Test
+    void shouldPersistPropertiesReturnFalseIfThereIsNoNewPropertyValueInParameter() {
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+            TRUE, BOOLEAN_VALIDATOR))));
+
+        assertFalse(propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), TRUE)));
+    }
+
+    @Test
+    void shouldAddNewLinesForPropertiesWhichDidNotExistsBeforeInBootstrapConf() throws IOException {
+        new File(bootstrapConfigFileLocation).createNewFile();
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+            TRUE, BOOLEAN_VALIDATOR))));
+
+        propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+        Properties properties = readUpdatedProperties();
+        assertEquals(1, properties.stringPropertyNames().size());
+        assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+    }
+
+    @Test
+    void shouldModifyPropertiesForPreviouslyCommentedLines() {
+        writeBootstrapFile("#" + C2_ENABLE.getKey() + "=" + TRUE);
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+            TRUE, BOOLEAN_VALIDATOR))));
+
+        propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+        Properties properties = readUpdatedProperties();
+        assertEquals(2, properties.stringPropertyNames().size());
+        assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+        assertEquals(EXTRA_PROPERTY_VALUE, properties.getProperty(EXTRA_PROPERTY_KEY));
+    }
+
+    @Test
+    void shouldModifyProperties() {
+        writeBootstrapFile(C2_ENABLE.getKey() + "=" + TRUE);
+        when(updatePropertiesPropertyProvider.getProperties()).thenReturn(Collections.singletonMap(AVAILABLE_PROPERTIES, Collections.singleton(new UpdatableProperty(C2_ENABLE.getKey(),
+            TRUE, BOOLEAN_VALIDATOR))));
+
+        propertiesPersister.persistProperties(Collections.singletonMap(C2_ENABLE.getKey(), FALSE));
+
+        Properties properties = readUpdatedProperties();
+        assertEquals(2, properties.stringPropertyNames().size());
+        assertEquals(FALSE, properties.getProperty(C2_ENABLE.getKey()));
+        assertEquals(EXTRA_PROPERTY_VALUE, properties.getProperty(EXTRA_PROPERTY_KEY));
+    }
+
+    private void writeBootstrapFile(String property) {
+        try(BufferedWriter writer = new BufferedWriter(new FileWriter(bootstrapConfigFileLocation))) {
+            writer.write(property + System.lineSeparator());
+            writer.write(EXTRA_PROPERTY_KEY + "=" + EXTRA_PROPERTY_VALUE);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Properties readUpdatedProperties() {
+        Properties props = new Properties();
+        try (FileInputStream fis = new FileInputStream(bootstrapNewConfigFileLocation)) {
+            props.load(fis);
+        } catch (Exception e) {
+            fail("Failed to read bootstrap file");
+        }
+        return props;
+    }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java
new file mode 100644
index 0000000000..6923e29734
--- /dev/null
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/c2/command/UpdatePropertiesPropertyProviderTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.c2.command;
+
+import static org.apache.nifi.minifi.MiNiFiProperties.GRACEFUL_SHUTDOWN_SECOND;
+import static org.apache.nifi.minifi.MiNiFiProperties.JAVA;
+import static org.apache.nifi.minifi.MiNiFiProperties.NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD;
+import static org.apache.nifi.minifi.MiNiFiProperties.PROPERTIES_BY_KEY;
+import static org.apache.nifi.minifi.c2.command.UpdatePropertiesPropertyProvider.AVAILABLE_PROPERTIES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.nifi.minifi.MiNiFiProperties;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class UpdatePropertiesPropertyProviderTest {
+
+    @TempDir
+    private File tmpDir;
+
+    private UpdatePropertiesPropertyProvider updatePropertiesPropertyProvider;
+    private String bootstrapConfigFileLocation;
+
+    @BeforeEach
+    void setup() {
+        bootstrapConfigFileLocation = tmpDir + "/bootstrap.conf";
+        updatePropertiesPropertyProvider = new UpdatePropertiesPropertyProvider(bootstrapConfigFileLocation);
+    }
+
+    @Test
+    void shouldReturnModifiableNonSensitivePropertiesWithValues() throws IOException {
+        Properties props = new Properties();
+        props.setProperty(JAVA.getKey(), "java");
+        props.setProperty(GRACEFUL_SHUTDOWN_SECOND.getKey(), "20");
+        props.setProperty(NIFI_MINIFI_SECURITY_KEYSTORE_PASSWD.getKey(), "truststore");
+
+        try (FileOutputStream fos = new FileOutputStream(bootstrapConfigFileLocation)) {
+            props.store(fos, null);
+        }
+
+        Map<String, Object> result = updatePropertiesPropertyProvider.getProperties();
+
+        LinkedHashSet<UpdatableProperty> expected = getUpdatableProperties(props);
+
+        assertEquals(Collections.singletonMap(AVAILABLE_PROPERTIES, expected), result);
+    }
+
+    @Test
+    void shouldGetReturnListWithEmptyValuesInCaseOfFileNotFoundException() {
+        Map<String, Object> properties = updatePropertiesPropertyProvider.getProperties();
+
+        assertEquals(Collections.singletonMap(AVAILABLE_PROPERTIES, getUpdatableProperties(new Properties())), properties);
+    }
+
+    private static LinkedHashSet<UpdatableProperty> getUpdatableProperties(Properties props) {
+        return PROPERTIES_BY_KEY.values()
+            .stream()
+            .filter(property -> !property.isSensitive())
+            .filter(MiNiFiProperties::isModifiable)
+            .map(property -> new UpdatableProperty(property.getKey(), (String) props.get(property.getKey()), property.getValidator().name()))
+            .collect(Collectors.toCollection(LinkedHashSet::new));
+    }
+}
\ No newline at end of file
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
index b1925b6bad..57653e7773 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/conf/bootstrap.conf
@@ -161,7 +161,3 @@ java.arg.14=-Djava.awt.headless=true
 #c2.security.keystore.password=
 #c2.security.keystore.type=JKS
 #c2.request.compression=none
-# The following ingestor configuration needs to be enabled in order to apply configuration updates coming from C2 server
-#nifi.minifi.notifier.ingestors=org.apache.nifi.minifi.bootstrap.configuration.ingestors.FileChangeIngestor
-#nifi.minifi.notifier.ingestors.file.config.path=./conf/config-new.yml
-#nifi.minifi.notifier.ingestors.file.polling.period.seconds=5
diff --git a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
index 16aeba7871..62c0f00c17 100644
--- a/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
+++ b/minifi/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-server/src/main/java/org/apache/nifi/minifi/StandardMiNiFiServer.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import org.apache.nifi.headless.HeadlessNiFiServer;
 import org.apache.nifi.minifi.bootstrap.BootstrapListener;
-import org.apache.nifi.minifi.c2.C2NiFiProperties;
 import org.apache.nifi.minifi.c2.C2NifiClientService;
 import org.apache.nifi.minifi.commons.status.FlowStatusReport;
 import org.apache.nifi.minifi.status.StatusConfigReporter;
@@ -55,8 +54,8 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
 
         initBootstrapListener();
         initC2();
-
         sendStartedStatus();
+        startHeartbeat();
     }
 
     @Override
@@ -79,16 +78,21 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
     }
 
     private void initC2() {
-        if (Boolean.parseBoolean(props.getProperty(C2NiFiProperties.C2_ENABLE_KEY, "false"))) {
+        if (Boolean.parseBoolean(props.getProperty(MiNiFiProperties.C2_ENABLE.getKey(), MiNiFiProperties.C2_ENABLE.getDefaultValue()))) {
             logger.info("C2 enabled, creating a C2 client instance");
-            c2NifiClientService = new C2NifiClientService(props, flowController);
-            c2NifiClientService.start();
+            c2NifiClientService = new C2NifiClientService(props, flowController, bootstrapListener);
         } else {
-            logger.debug("C2 Property [{}] missing or disabled: C2 client not created", C2NiFiProperties.C2_ENABLE_KEY);
+            logger.debug("C2 Property [{}] missing or disabled: C2 client not created", MiNiFiProperties.C2_ENABLE.getKey());
             c2NifiClientService = null;
         }
     }
 
+    private void startHeartbeat() {
+        if (c2NifiClientService != null) {
+            c2NifiClientService.start();
+        }
+    }
+
     private void initBootstrapListener() {
         String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
         if (bootstrapPort != null) {
@@ -101,7 +105,9 @@ public class StandardMiNiFiServer extends HeadlessNiFiServer implements MiNiFiSe
 
                 bootstrapListener = new BootstrapListener(this, port);
                 bootstrapListener.start();
-            } catch (NumberFormatException | IOException nfe) {
+            } catch(IOException e){
+                throw new UncheckedIOException("Failed to start MiNiFi because of Bootstrap listener initialization error", e);
+            } catch (NumberFormatException e) {
                 throw new RuntimeException("Failed to start MiNiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
             }
         } else {