You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/06/28 17:02:55 UTC

[nifi] branch main updated: NIFI-9908 C2 refactor and test coverage improvements

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1465c2c629 NIFI-9908 C2 refactor and test coverage improvements
1465c2c629 is described below

commit 1465c2c629e1a56c354292db7859e6a4074dff59
Author: Csaba Bejan <be...@gmail.com>
AuthorDate: Thu Jun 23 12:07:49 2022 +0200

    NIFI-9908 C2 refactor and test coverage improvements
    
    This closes #6149
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 c2/c2-client-bundle/c2-client-http/pom.xml         |   6 +
 .../apache/nifi/c2/client/http/C2HttpClient.java   |  76 +++++-----
 .../nifi/c2/client/http/C2ServerException.java     |  29 ++++
 .../nifi/c2/client/http/C2HttpClientTest.java      | 156 +++++++++++++++++++++
 .../nifi/c2/client/service/C2ClientService.java    |   9 +-
 .../UpdateConfigurationOperationHandler.java       |  13 +-
 .../c2/client/service/C2ClientServiceTest.java     | 146 +++++++++++++++++++
 .../c2/client/service/C2HeartbeatFactoryTest.java  | 103 ++++++++++++++
 .../service/operation/C2OperationServiceTest.java  | 114 +++++++++++++++
 .../UpdateConfigurationOperationHandlerTest.java   | 119 ++++++++++++++++
 .../org/apache/nifi/c2/C2NifiClientService.java    |  14 +-
 .../nifi/controller/StandardFlowService.java       |   2 +-
 12 files changed, 729 insertions(+), 58 deletions(-)

diff --git a/c2/c2-client-bundle/c2-client-http/pom.xml b/c2/c2-client-bundle/c2-client-http/pom.xml
index 75076c051e..927c3621b3 100644
--- a/c2/c2-client-bundle/c2-client-http/pom.xml
+++ b/c2/c2-client-bundle/c2-client-http/pom.xml
@@ -47,5 +47,11 @@ limitations under the License.
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>logging-interceptor</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index a641ac6093..0d3b4e6cd3 100644
--- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -91,6 +91,44 @@ public class C2HttpClient implements C2Client {
         return serializer.serialize(heartbeat).flatMap(this::sendHeartbeat);
     }
 
+    @Override
+    public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
+        Optional<byte[]> updateContent = Optional.empty();
+        final Request.Builder requestBuilder = new Request.Builder()
+            .get()
+            .url(flowUpdateUrl);
+        final Request request = requestBuilder.build();
+
+        try (Response response = httpClientReference.get().newCall(request).execute()) {
+            Optional<ResponseBody> body = Optional.ofNullable(response.body());
+
+            if (!response.isSuccessful()) {
+                StringBuilder messageBuilder = new StringBuilder(String.format("Configuration retrieval failed: HTTP %d", response.code()));
+                body.map(Object::toString).ifPresent(messageBuilder::append);
+                throw new C2ServerException(messageBuilder.toString());
+            }
+
+            if (body.isPresent()) {
+                updateContent = Optional.of(body.get().bytes());
+            } else {
+                logger.warn("No body returned when pulling a new configuration");
+            }
+        } catch (Exception e) {
+            logger.warn("Configuration retrieval failed", e);
+        }
+
+        return updateContent;
+    }
+
+    @Override
+    public void acknowledgeOperation(C2OperationAck operationAck) {
+        logger.info("Acknowledging Operation [{}] C2 URL [{}]", operationAck.getOperationId(), clientConfig.getC2AckUrl());
+        serializer.serialize(operationAck)
+            .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
+            .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
+            .ifPresent(this::sendAck);
+    }
+
     private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
         Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
         Request request = new Request.Builder()
@@ -198,44 +236,6 @@ public class C2HttpClient implements C2Client {
         }
     }
 
-    @Override
-    public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
-        final Request.Builder requestBuilder = new Request.Builder()
-                .get()
-                .url(flowUpdateUrl);
-        final Request request = requestBuilder.build();
-
-        ResponseBody body;
-        try (final Response response = httpClientReference.get().newCall(request).execute()) {
-            int code = response.code();
-            if (code >= 400) {
-                final String message = String.format("Configuration retrieval failed: HTTP %d %s", code, response.body().string());
-                throw new IOException(message);
-            }
-
-            body = response.body();
-
-            if (body == null) {
-                logger.warn("No body returned when pulling a new configuration");
-                return Optional.empty();
-            }
-
-            return Optional.of(body.bytes());
-        } catch (Exception e) {
-            logger.warn("Configuration retrieval failed", e);
-            return Optional.empty();
-        }
-    }
-
-    @Override
-    public void acknowledgeOperation(C2OperationAck operationAck) {
-        logger.info("Performing acknowledgement request to {} for operation {}", clientConfig.getC2AckUrl(), operationAck.getOperationId());
-        serializer.serialize(operationAck)
-            .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
-            .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
-            .ifPresent(this::sendAck);
-    }
-
     private void sendAck(Request request) {
         try(Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
             if (!heartbeatResponse.isSuccessful()) {
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java
new file mode 100644
index 0000000000..984dd56956
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.http;
+
+import java.io.IOException;
+
+/**
+ * Exception to differentiate C2 specific issues from standard IOExceptions
+ */
+public class C2ServerException extends IOException {
+
+    public C2ServerException(String message) {
+        super(message);
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
new file mode 100644
index 0000000000..aec67b2f0e
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.http;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.serializer.C2Serializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HttpClientTest {
+
+    private static final String HEARTBEAT_PATH = "c2/heartbeat";
+    private static final String UPDATE_PATH = "c2/update";
+    private static final String ACK_PATH = "c2/acknowledge";
+    private static final int HTTP_STATUS_OK = 200;
+    private static final int HTTP_STATUS_BAD_REQUEST = 400;
+
+    @Mock
+    private C2ClientConfig c2ClientConfig;
+
+    @Mock
+    private C2Serializer serializer;
+
+    @InjectMocks
+    private C2HttpClient c2HttpClient;
+
+    private MockWebServer mockWebServer;
+
+    private String baseUrl;
+
+    @BeforeEach
+    public void startServer() {
+        mockWebServer = new MockWebServer();
+        baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
+    }
+
+    @AfterEach
+    public void shutdownServer() throws IOException {
+        mockWebServer.shutdown();
+    }
+
+    @Test
+    void testPublishHeartbeatSuccess() throws InterruptedException {
+        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+        mockWebServer.enqueue(new MockResponse().setBody("responseBody"));
+
+        when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
+        when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse));
+        when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH);
+
+        Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
+
+        assertTrue(response.isPresent());
+        assertEquals(response.get(), hbResponse);
+
+        RecordedRequest request = mockWebServer.takeRequest();
+        assertEquals("/" + HEARTBEAT_PATH, request.getPath());
+    }
+
+    @Test
+    void testPublishHeartbeatReturnEmptyInCaseOfCommunicationIssue() {
+        when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
+        when(c2ClientConfig.getC2Url()).thenReturn("http://localhost/incorrectPath");
+
+        Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
+
+        assertFalse(response.isPresent());
+    }
+
+    @Test
+    void testConstructorThrowsExceptionForInvalidKeystoreFilenameAtInitialization() {
+        when(c2ClientConfig.getKeystoreFilename()).thenReturn("incorrectKeystoreFilename");
+
+        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> new C2HttpClient(c2ClientConfig, serializer));
+
+        assertTrue(exception.getMessage().contains("TLS"));
+    }
+
+    @Test
+    void testRetrieveUpdateContentReturnsEmptyWhenServerErrorResponse() throws InterruptedException {
+        mockWebServer.enqueue(new MockResponse().setBody("updateContent").setResponseCode(HTTP_STATUS_BAD_REQUEST));
+
+        Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
+
+        assertFalse(response.isPresent());
+
+        RecordedRequest request = mockWebServer.takeRequest();
+        assertEquals("/" + UPDATE_PATH, request.getPath());
+    }
+
+    @Test
+    void testRetrieveUpdateContentReturnsResponseWithBody() throws InterruptedException {
+        String content = "updateContent";
+        mockWebServer.enqueue(new MockResponse().setBody(content).setResponseCode(HTTP_STATUS_OK));
+
+        Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
+
+        assertTrue(response.isPresent());
+        assertArrayEquals(content.getBytes(StandardCharsets.UTF_8), response.get());
+
+        RecordedRequest request = mockWebServer.takeRequest();
+        assertEquals("/" + UPDATE_PATH, request.getPath());
+    }
+
+    @Test
+    void testAcknowledgeOperation() throws InterruptedException {
+        String ackContent = "ack";
+        when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH);
+        when(serializer.serialize(any(C2OperationAck.class))).thenReturn(Optional.of(ackContent));
+        mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_STATUS_OK));
+
+        c2HttpClient.acknowledgeOperation(new C2OperationAck());
+
+        RecordedRequest request = mockWebServer.takeRequest();
+        assertEquals("/" + ACK_PATH, request.getPath());
+        assertTrue(request.getHeader("Content-Type").contains("application/json"));
+        assertArrayEquals(ackContent.getBytes(StandardCharsets.UTF_8), request.getBody().readByteArray());
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
index a36b7bf3fa..c9424c67c2 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
@@ -16,13 +16,10 @@
  */
 package org.apache.nifi.c2.client.service;
 
-import java.util.Arrays;
 import java.util.List;
-import java.util.function.Function;
 import org.apache.nifi.c2.client.api.C2Client;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
 import org.apache.nifi.c2.client.service.operation.C2OperationService;
-import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
 import org.apache.nifi.c2.protocol.api.C2Heartbeat;
 import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
 import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -36,13 +33,11 @@ public class C2ClientService {
     private final C2Client client;
     private final C2HeartbeatFactory c2HeartbeatFactory;
     private final C2OperationService operationService;
-    private final UpdateConfigurationOperationHandler updateConfigurationOperationHandler;
 
-    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow) {
+    public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
         this.client = client;
         this.c2HeartbeatFactory = c2HeartbeatFactory;
-        this.updateConfigurationOperationHandler = new UpdateConfigurationOperationHandler(client, flowIdHolder, updateFlow);
-        this.operationService = new C2OperationService(Arrays.asList(updateConfigurationOperationHandler));
+        this.operationService = operationService;
     }
 
     public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
index b58e07547e..a597abe189 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
@@ -23,6 +23,8 @@ import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
 import java.net.URI;
 import java.util.Optional;
 import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.nifi.c2.client.api.C2Client;
 import org.apache.nifi.c2.client.service.FlowIdHolder;
 import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -36,8 +38,9 @@ import org.slf4j.LoggerFactory;
 public class UpdateConfigurationOperationHandler implements C2OperationHandler {
 
     private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationOperationHandler.class);
+    private static final Pattern FLOW_ID_PATTERN = Pattern.compile("/[^/]+?/[^/]+?/[^/]+?/([^/]+)?/?.*");
 
-    private static final String LOCATION = "location";
+    static final String LOCATION = "location";
 
     private final C2Client client;
     private final Function<byte[], Boolean> updateFlow;
@@ -101,10 +104,10 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
     private String parseFlowId(String flowUpdateUrl) {
         try {
             URI flowUri = new URI(flowUpdateUrl);
-            String flowUriPath = flowUri.getPath();
-            String[] split = flowUriPath.split("/");
-            if (split.length > 4) {
-                return split[4];
+            Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
+
+            if (matcher.matches()) {
+                return matcher.group(1);
             } else {
                 throw new IllegalArgumentException(String.format("Flow Update URL format unexpected [%s]", flowUpdateUrl));
             }
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
new file mode 100644
index 0000000000..37bd572dd1
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2ClientServiceTest {
+
+    @Mock
+    private C2Client client;
+
+    @Mock
+    private C2HeartbeatFactory c2HeartbeatFactory;
+
+    @Mock
+    private C2OperationService operationService;
+
+    @Mock
+    private RuntimeInfoWrapper runtimeInfoWrapper;
+
+    @InjectMocks
+    private C2ClientService c2ClientService;
+
+    @Test
+    void testSendHeartbeatAndAckWhenOperationPresent() {
+        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+        hbResponse.setRequestedOperations(generateOperation(1));
+        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+        when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(any());
+        verify(client).publishHeartbeat(heartbeat);
+        verify(operationService).handleOperation(any());
+        verify(client).acknowledgeOperation(any());
+    }
+
+    @Test
+    void testSendHeartbeatAndAckForMultipleOperationPresent() {
+        int operationNum = 5;
+        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+        hbResponse.setRequestedOperations(generateOperation(operationNum));
+        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+        when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(any());
+        verify(client).publishHeartbeat(heartbeat);
+        verify(operationService, times(operationNum)).handleOperation(any());
+        verify(client, times(operationNum)).acknowledgeOperation(any());
+    }
+
+    @Test
+    void testSendHeartbeatHandlesNoHeartbeatResponse() {
+        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(any());
+        verify(client).publishHeartbeat(heartbeat);
+        verify(operationService, times(0)).handleOperation(any());
+        verify(client, times(0)).acknowledgeOperation(any());
+    }
+
+    @Test
+    void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
+        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(any());
+        verify(client).publishHeartbeat(heartbeat);
+        verify(operationService, times(0)).handleOperation(any());
+        verify(client, times(0)).acknowledgeOperation(any());
+    }
+
+    @Test
+    void testSendHeartbeatNotAckWhenOperationAckMissing() {
+        C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+        when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+        C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+        hbResponse.setRequestedOperations(generateOperation(1));
+        when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+        when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+
+        c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+        verify(c2HeartbeatFactory).create(any());
+        verify(client).publishHeartbeat(heartbeat);
+        verify(operationService).handleOperation(any());
+        verify(client, times(0)).acknowledgeOperation(any());
+    }
+
+    private List<C2Operation> generateOperation(int num) {
+        return IntStream.range(0, num)
+            .mapToObj(x -> new C2Operation())
+            .collect(Collectors.toList());
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
new file mode 100644
index 0000000000..111d750db5
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.AgentRepositories;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HeartbeatFactoryTest {
+
+    private static final String AGENT_CLASS = "agentClass";
+    private static final String FLOW_ID = "flowId";
+
+    @Mock
+    private C2ClientConfig clientConfig;
+
+    @Mock
+    private FlowIdHolder flowIdHolder;
+
+    @InjectMocks
+    private C2HeartbeatFactory c2HeartbeatFactory;
+
+    @TempDir
+    private File tempDir;
+
+    @BeforeEach
+    public void setup() {
+        when(clientConfig.getConfDirectory()).thenReturn(tempDir.getAbsolutePath());
+    }
+
+    @Test
+    void testCreateHeartbeat() {
+        when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+        when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS);
+
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
+
+        assertEquals(FLOW_ID, heartbeat.getFlowId());
+        assertEquals(AGENT_CLASS, heartbeat.getAgentClass());
+    }
+
+    @Test
+    void testCreateGeneratesAgentAndDeviceIdIfNotPresent() {
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
+
+        assertNotNull(heartbeat.getAgentId());
+        assertNotNull(heartbeat.getDeviceId());
+    }
+
+    @Test
+    void testCreatePopulatesFromRuntimeInfoWrapper() {
+        AgentRepositories repos = new AgentRepositories();
+        RuntimeManifest manifest = new RuntimeManifest();
+        Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+
+        C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
+
+        assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
+        assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
+        assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+    }
+
+    @Test
+    void testCreateThrowsExceptionWhenConfDirNotSet() {
+        when(clientConfig.getConfDirectory()).thenReturn(String.class.getSimpleName());
+
+        assertThrows(IllegalStateException.class, () -> c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class)));
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
new file mode 100644
index 0000000000..0249369e27
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class C2OperationServiceTest {
+
+    private static C2OperationAck operationAck;
+
+    @BeforeAll
+    public static void setup(){
+        operationAck = new C2OperationAck();
+        operationAck.setOperationId("12345");
+    }
+
+    @Test
+    void testHandleOperationReturnsEmptyForUnrecognisedOperationType() {
+        C2OperationService service = new C2OperationService(Collections.emptyList());
+
+        C2Operation operation = new C2Operation();
+        operation.setOperation(OperationType.UPDATE);
+        operation.setOperand(OperandType.CONFIGURATION);
+        Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+        assertFalse(ack.isPresent());
+    }
+
+    @Test
+    void testHandleOperation() {
+        C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
+
+        C2Operation operation = new C2Operation();
+        operation.setOperation(OperationType.DESCRIBE);
+        operation.setOperand(OperandType.MANIFEST);
+        Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+        assertTrue(ack.isPresent());
+        assertEquals(operationAck, ack.get());
+    }
+
+    @Test
+    void testHandleOperationReturnsEmptyForOperandMismatch() {
+        C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
+
+        C2Operation operation = new C2Operation();
+        operation.setOperation(OperationType.DESCRIBE);
+        operation.setOperand(OperandType.MANIFEST);
+        Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+        assertFalse(ack.isPresent());
+    }
+
+    private static class TestDescribeOperationHandler implements C2OperationHandler {
+
+        @Override
+        public OperationType getOperationType() {
+            return OperationType.DESCRIBE;
+        }
+
+        @Override
+        public OperandType getOperandType() {
+            return OperandType.MANIFEST;
+        }
+
+        @Override
+        public C2OperationAck handle(C2Operation operation) {
+            return operationAck;
+        }
+    }
+
+    private static class TestInvalidOperationHandler implements C2OperationHandler {
+
+        @Override
+        public OperationType getOperationType() {
+            return OperationType.DESCRIBE;
+        }
+
+        @Override
+        public OperandType getOperandType() {
+            return OperandType.CONFIGURATION;
+        }
+
+        @Override
+        public C2OperationAck handle(C2Operation operation) {
+            return null;
+        }
+    }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java
new file mode 100644
index 0000000000..17b18a0f44
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.FlowIdHolder;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdateConfigurationOperationHandlerTest {
+
+    private static final String FLOW_ID = "flowId";
+    private static final String OPERATION_ID = "operationId";
+    private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "/path/for/the/" + FLOW_ID);
+    private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "incorrect/location");
+
+    @Mock
+    private C2Client client;
+
+    @Mock
+    private FlowIdHolder flowIdHolder;
+
+    @Test
+    void testUpdateConfigurationOperationHandlerCreateSuccess() {
+        UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
+
+        assertEquals(OperationType.UPDATE, handler.getOperationType());
+        assertEquals(OperandType.CONFIGURATION, handler.getOperandType());
+    }
+
+    @Test
+    void testHandleThrowsExceptionForIncorrectArg() {
+        UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
+        C2Operation operation = new C2Operation();
+        operation.setArgs(INCORRECT_LOCATION_MAP);
+
+        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> handler.handle(operation));
+    }
+
+    @Test
+    void testHandleReturnsNotAppliedWithNoContent() {
+        when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+        when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty());
+        UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null);
+        C2Operation operation = new C2Operation();
+        operation.setArgs(CORRECT_LOCATION_MAP);
+
+        C2OperationAck response = handler.handle(operation);
+
+        assertEquals(EMPTY, response.getOperationId());
+        assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
+    }
+
+    @Test
+    void testHandleReturnsNotAppliedWithContentApplyIssues() {
+        Function<byte[], Boolean> failedToUpdate = x -> false;
+        when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+        when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
+        UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate);
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(OPERATION_ID);
+        operation.setArgs(CORRECT_LOCATION_MAP);
+
+        C2OperationAck response = handler.handle(operation);
+
+        assertEquals(OPERATION_ID, response.getOperationId());
+        assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
+    }
+
+    @Test
+    void testHandleReturnsFullyApplied() {
+        Function<byte[], Boolean> successUpdate = x -> true;
+        when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+        when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
+        UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate);
+        C2Operation operation = new C2Operation();
+        operation.setIdentifier(OPERATION_ID);
+        operation.setArgs(CORRECT_LOCATION_MAP);
+
+        C2OperationAck response = handler.handle(operation);
+
+        assertEquals(OPERATION_ID, response.getOperationId());
+        assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState());
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
index 39eef4e78c..66dcd9967d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.Optional;
 import org.apache.nifi.c2.client.C2ClientConfig;
 import org.apache.nifi.c2.client.http.C2HttpClient;
@@ -28,6 +29,8 @@ import org.apache.nifi.c2.client.service.C2ClientService;
 import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
 import org.apache.nifi.c2.client.service.FlowIdHolder;
 import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
 import org.apache.nifi.c2.protocol.api.AgentRepositories;
 import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
 import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
@@ -42,7 +45,6 @@ import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParse
 import org.apache.nifi.manifest.RuntimeManifestService;
 import org.apache.nifi.manifest.StandardRuntimeManifestService;
 import org.apache.nifi.nar.ExtensionManagerHolder;
-import org.apache.nifi.services.FlowService;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.slf4j.Logger;
@@ -66,7 +68,6 @@ public class C2NifiClientService {
 
     private final C2ClientService c2ClientService;
 
-    private final FlowService flowService;
     private final FlowController flowController;
     private final String propertiesDir;
     private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
@@ -75,7 +76,7 @@ public class C2NifiClientService {
     private final RuntimeManifestService runtimeManifestService;
     private final long heartbeatPeriod;
 
-    public C2NifiClientService(final NiFiProperties niFiProperties, final FlowService flowService, final FlowController flowController) {
+    public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
         C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
         FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
         this.propertiesDir = niFiProperties.getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
@@ -86,13 +87,12 @@ public class C2NifiClientService {
             clientConfig.getRuntimeType()
         );
         this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
-        this.flowService = flowService;
         this.flowController = flowController;
+        C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
         this.c2ClientService = new C2ClientService(
-            new C2HttpClient(clientConfig, new C2JacksonSerializer()),
+            client,
             new C2HeartbeatFactory(clientConfig, flowIdHolder),
-            flowIdHolder,
-            this::updateFlowContent
+            new C2OperationService(Arrays.asList(new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent)))
         );
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index c7ddf57a99..1900685d23 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -300,7 +300,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
                 final boolean c2Enabled = Boolean.parseBoolean(nifiProperties.getProperty(C2NiFiProperties.C2_ENABLE_KEY, "false"));
                 if (c2Enabled) {
                     logger.info("C2 enabled, creating a C2 client instance");
-                    c2NifiClientService = new C2NifiClientService(nifiProperties, this, this.controller);
+                    c2NifiClientService = new C2NifiClientService(nifiProperties, this.controller);
                     c2NifiClientService.start();
                 } else {
                     logger.info("C2 Property [{}] missing or disabled: C2 client not created", C2NiFiProperties.C2_ENABLE_KEY);