You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/06/28 17:02:55 UTC
[nifi] branch main updated: NIFI-9908 C2 refactor and test coverage improvements
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1465c2c629 NIFI-9908 C2 refactor and test coverage improvements
1465c2c629 is described below
commit 1465c2c629e1a56c354292db7859e6a4074dff59
Author: Csaba Bejan <be...@gmail.com>
AuthorDate: Thu Jun 23 12:07:49 2022 +0200
NIFI-9908 C2 refactor and test coverage improvements
This closes #6149
Signed-off-by: David Handermann <ex...@apache.org>
---
c2/c2-client-bundle/c2-client-http/pom.xml | 6 +
.../apache/nifi/c2/client/http/C2HttpClient.java | 76 +++++-----
.../nifi/c2/client/http/C2ServerException.java | 29 ++++
.../nifi/c2/client/http/C2HttpClientTest.java | 156 +++++++++++++++++++++
.../nifi/c2/client/service/C2ClientService.java | 9 +-
.../UpdateConfigurationOperationHandler.java | 13 +-
.../c2/client/service/C2ClientServiceTest.java | 146 +++++++++++++++++++
.../c2/client/service/C2HeartbeatFactoryTest.java | 103 ++++++++++++++
.../service/operation/C2OperationServiceTest.java | 114 +++++++++++++++
.../UpdateConfigurationOperationHandlerTest.java | 119 ++++++++++++++++
.../org/apache/nifi/c2/C2NifiClientService.java | 14 +-
.../nifi/controller/StandardFlowService.java | 2 +-
12 files changed, 729 insertions(+), 58 deletions(-)
diff --git a/c2/c2-client-bundle/c2-client-http/pom.xml b/c2/c2-client-bundle/c2-client-http/pom.xml
index 75076c051e..927c3621b3 100644
--- a/c2/c2-client-bundle/c2-client-http/pom.xml
+++ b/c2/c2-client-bundle/c2-client-http/pom.xml
@@ -47,5 +47,11 @@ limitations under the License.
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
index a641ac6093..0d3b4e6cd3 100644
--- a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2HttpClient.java
@@ -91,6 +91,44 @@ public class C2HttpClient implements C2Client {
return serializer.serialize(heartbeat).flatMap(this::sendHeartbeat);
}
+ @Override
+ public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
+ Optional<byte[]> updateContent = Optional.empty();
+ final Request.Builder requestBuilder = new Request.Builder()
+ .get()
+ .url(flowUpdateUrl);
+ final Request request = requestBuilder.build();
+
+ try (Response response = httpClientReference.get().newCall(request).execute()) {
+ Optional<ResponseBody> body = Optional.ofNullable(response.body());
+
+ if (!response.isSuccessful()) {
+ StringBuilder messageBuilder = new StringBuilder(String.format("Configuration retrieval failed: HTTP %d", response.code()));
+ body.map(Object::toString).ifPresent(messageBuilder::append);
+ throw new C2ServerException(messageBuilder.toString());
+ }
+
+ if (body.isPresent()) {
+ updateContent = Optional.of(body.get().bytes());
+ } else {
+ logger.warn("No body returned when pulling a new configuration");
+ }
+ } catch (Exception e) {
+ logger.warn("Configuration retrieval failed", e);
+ }
+
+ return updateContent;
+ }
+
+ @Override
+ public void acknowledgeOperation(C2OperationAck operationAck) {
+ logger.info("Acknowledging Operation [{}] C2 URL [{}]", operationAck.getOperationId(), clientConfig.getC2AckUrl());
+ serializer.serialize(operationAck)
+ .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
+ .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
+ .ifPresent(this::sendAck);
+ }
+
private Optional<C2HeartbeatResponse> sendHeartbeat(String heartbeat) {
Optional<C2HeartbeatResponse> c2HeartbeatResponse = Optional.empty();
Request request = new Request.Builder()
@@ -198,44 +236,6 @@ public class C2HttpClient implements C2Client {
}
}
- @Override
- public Optional<byte[]> retrieveUpdateContent(String flowUpdateUrl) {
- final Request.Builder requestBuilder = new Request.Builder()
- .get()
- .url(flowUpdateUrl);
- final Request request = requestBuilder.build();
-
- ResponseBody body;
- try (final Response response = httpClientReference.get().newCall(request).execute()) {
- int code = response.code();
- if (code >= 400) {
- final String message = String.format("Configuration retrieval failed: HTTP %d %s", code, response.body().string());
- throw new IOException(message);
- }
-
- body = response.body();
-
- if (body == null) {
- logger.warn("No body returned when pulling a new configuration");
- return Optional.empty();
- }
-
- return Optional.of(body.bytes());
- } catch (Exception e) {
- logger.warn("Configuration retrieval failed", e);
- return Optional.empty();
- }
- }
-
- @Override
- public void acknowledgeOperation(C2OperationAck operationAck) {
- logger.info("Performing acknowledgement request to {} for operation {}", clientConfig.getC2AckUrl(), operationAck.getOperationId());
- serializer.serialize(operationAck)
- .map(operationAckBody -> RequestBody.create(operationAckBody, MEDIA_TYPE_APPLICATION_JSON))
- .map(requestBody -> new Request.Builder().post(requestBody).url(clientConfig.getC2AckUrl()).build())
- .ifPresent(this::sendAck);
- }
-
private void sendAck(Request request) {
try(Response heartbeatResponse = httpClientReference.get().newCall(request).execute()) {
if (!heartbeatResponse.isSuccessful()) {
diff --git a/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java
new file mode 100644
index 0000000000..984dd56956
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/src/main/java/org/apache/nifi/c2/client/http/C2ServerException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.http;
+
+import java.io.IOException;
+
+/**
+ * Exception to differentiate C2 specific issues from standard IOExceptions
+ */
+public class C2ServerException extends IOException {
+
+ public C2ServerException(String message) {
+ super(message);
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
new file mode 100644
index 0000000000..aec67b2f0e
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-http/src/test/java/org/apache/nifi/c2/client/http/C2HttpClientTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.http;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Optional;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.serializer.C2Serializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HttpClientTest {
+
+ private static final String HEARTBEAT_PATH = "c2/heartbeat";
+ private static final String UPDATE_PATH = "c2/update";
+ private static final String ACK_PATH = "c2/acknowledge";
+ private static final int HTTP_STATUS_OK = 200;
+ private static final int HTTP_STATUS_BAD_REQUEST = 400;
+
+ @Mock
+ private C2ClientConfig c2ClientConfig;
+
+ @Mock
+ private C2Serializer serializer;
+
+ @InjectMocks
+ private C2HttpClient c2HttpClient;
+
+ private MockWebServer mockWebServer;
+
+ private String baseUrl;
+
+ @BeforeEach
+ public void startServer() {
+ mockWebServer = new MockWebServer();
+ baseUrl = mockWebServer.url("/").newBuilder().host("localhost").build().toString();
+ }
+
+ @AfterEach
+ public void shutdownServer() throws IOException {
+ mockWebServer.shutdown();
+ }
+
+ @Test
+ void testPublishHeartbeatSuccess() throws InterruptedException {
+ C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+ mockWebServer.enqueue(new MockResponse().setBody("responseBody"));
+
+ when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
+ when(serializer.deserialize(any(), any())).thenReturn(Optional.of(hbResponse));
+ when(c2ClientConfig.getC2Url()).thenReturn(baseUrl + HEARTBEAT_PATH);
+
+ Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
+
+ assertTrue(response.isPresent());
+ assertEquals(response.get(), hbResponse);
+
+ RecordedRequest request = mockWebServer.takeRequest();
+ assertEquals("/" + HEARTBEAT_PATH, request.getPath());
+ }
+
+ @Test
+ void testPublishHeartbeatReturnEmptyInCaseOfCommunicationIssue() {
+ when(serializer.serialize(any(C2Heartbeat.class))).thenReturn(Optional.of("Heartbeat"));
+ when(c2ClientConfig.getC2Url()).thenReturn("http://localhost/incorrectPath");
+
+ Optional<C2HeartbeatResponse> response = c2HttpClient.publishHeartbeat(new C2Heartbeat());
+
+ assertFalse(response.isPresent());
+ }
+
+ @Test
+ void testConstructorThrowsExceptionForInvalidKeystoreFilenameAtInitialization() {
+ when(c2ClientConfig.getKeystoreFilename()).thenReturn("incorrectKeystoreFilename");
+
+ IllegalStateException exception = assertThrows(IllegalStateException.class, () -> new C2HttpClient(c2ClientConfig, serializer));
+
+ assertTrue(exception.getMessage().contains("TLS"));
+ }
+
+ @Test
+ void testRetrieveUpdateContentReturnsEmptyWhenServerErrorResponse() throws InterruptedException {
+ mockWebServer.enqueue(new MockResponse().setBody("updateContent").setResponseCode(HTTP_STATUS_BAD_REQUEST));
+
+ Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
+
+ assertFalse(response.isPresent());
+
+ RecordedRequest request = mockWebServer.takeRequest();
+ assertEquals("/" + UPDATE_PATH, request.getPath());
+ }
+
+ @Test
+ void testRetrieveUpdateContentReturnsResponseWithBody() throws InterruptedException {
+ String content = "updateContent";
+ mockWebServer.enqueue(new MockResponse().setBody(content).setResponseCode(HTTP_STATUS_OK));
+
+ Optional<byte[]> response = c2HttpClient.retrieveUpdateContent(baseUrl + UPDATE_PATH);
+
+ assertTrue(response.isPresent());
+ assertArrayEquals(content.getBytes(StandardCharsets.UTF_8), response.get());
+
+ RecordedRequest request = mockWebServer.takeRequest();
+ assertEquals("/" + UPDATE_PATH, request.getPath());
+ }
+
+ @Test
+ void testAcknowledgeOperation() throws InterruptedException {
+ String ackContent = "ack";
+ when(c2ClientConfig.getC2AckUrl()).thenReturn(baseUrl + ACK_PATH);
+ when(serializer.serialize(any(C2OperationAck.class))).thenReturn(Optional.of(ackContent));
+ mockWebServer.enqueue(new MockResponse().setResponseCode(HTTP_STATUS_OK));
+
+ c2HttpClient.acknowledgeOperation(new C2OperationAck());
+
+ RecordedRequest request = mockWebServer.takeRequest();
+ assertEquals("/" + ACK_PATH, request.getPath());
+ assertTrue(request.getHeader("Content-Type").contains("application/json"));
+ assertArrayEquals(ackContent.getBytes(StandardCharsets.UTF_8), request.getBody().readByteArray());
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
index a36b7bf3fa..c9424c67c2 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/C2ClientService.java
@@ -16,13 +16,10 @@
*/
package org.apache.nifi.c2.client.service;
-import java.util.Arrays;
import java.util.List;
-import java.util.function.Function;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
import org.apache.nifi.c2.client.service.operation.C2OperationService;
-import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -36,13 +33,11 @@ public class C2ClientService {
private final C2Client client;
private final C2HeartbeatFactory c2HeartbeatFactory;
private final C2OperationService operationService;
- private final UpdateConfigurationOperationHandler updateConfigurationOperationHandler;
- public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, FlowIdHolder flowIdHolder, Function<byte[], Boolean> updateFlow) {
+ public C2ClientService(C2Client client, C2HeartbeatFactory c2HeartbeatFactory, C2OperationService operationService) {
this.client = client;
this.c2HeartbeatFactory = c2HeartbeatFactory;
- this.updateConfigurationOperationHandler = new UpdateConfigurationOperationHandler(client, flowIdHolder, updateFlow);
- this.operationService = new C2OperationService(Arrays.asList(updateConfigurationOperationHandler));
+ this.operationService = operationService;
}
public void sendHeartbeat(RuntimeInfoWrapper runtimeInfoWrapper) {
diff --git a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
index b58e07547e..a597abe189 100644
--- a/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
+++ b/c2/c2-client-bundle/c2-client-service/src/main/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandler.java
@@ -23,6 +23,8 @@ import static org.apache.nifi.c2.protocol.api.OperationType.UPDATE;
import java.net.URI;
import java.util.Optional;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.nifi.c2.client.api.C2Client;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.protocol.api.C2Operation;
@@ -36,8 +38,9 @@ import org.slf4j.LoggerFactory;
public class UpdateConfigurationOperationHandler implements C2OperationHandler {
private static final Logger logger = LoggerFactory.getLogger(UpdateConfigurationOperationHandler.class);
+ private static final Pattern FLOW_ID_PATTERN = Pattern.compile("/[^/]+?/[^/]+?/[^/]+?/([^/]+)?/?.*");
- private static final String LOCATION = "location";
+ static final String LOCATION = "location";
private final C2Client client;
private final Function<byte[], Boolean> updateFlow;
@@ -101,10 +104,10 @@ public class UpdateConfigurationOperationHandler implements C2OperationHandler {
private String parseFlowId(String flowUpdateUrl) {
try {
URI flowUri = new URI(flowUpdateUrl);
- String flowUriPath = flowUri.getPath();
- String[] split = flowUriPath.split("/");
- if (split.length > 4) {
- return split[4];
+ Matcher matcher = FLOW_ID_PATTERN.matcher(flowUri.getPath());
+
+ if (matcher.matches()) {
+ return matcher.group(1);
} else {
throw new IllegalArgumentException(String.format("Flow Update URL format unexpected [%s]", flowUpdateUrl));
}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
new file mode 100644
index 0000000000..37bd572dd1
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2ClientServiceTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2ClientServiceTest {
+
+ @Mock
+ private C2Client client;
+
+ @Mock
+ private C2HeartbeatFactory c2HeartbeatFactory;
+
+ @Mock
+ private C2OperationService operationService;
+
+ @Mock
+ private RuntimeInfoWrapper runtimeInfoWrapper;
+
+ @InjectMocks
+ private C2ClientService c2ClientService;
+
+ @Test
+ void testSendHeartbeatAndAckWhenOperationPresent() {
+ C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+ when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+ C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+ hbResponse.setRequestedOperations(generateOperation(1));
+ when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+ when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(any());
+ verify(client).publishHeartbeat(heartbeat);
+ verify(operationService).handleOperation(any());
+ verify(client).acknowledgeOperation(any());
+ }
+
+ @Test
+ void testSendHeartbeatAndAckForMultipleOperationPresent() {
+ int operationNum = 5;
+ C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+ when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+ C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+ hbResponse.setRequestedOperations(generateOperation(operationNum));
+ when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+ when(operationService.handleOperation(any())).thenReturn(Optional.of(new C2OperationAck()));
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(any());
+ verify(client).publishHeartbeat(heartbeat);
+ verify(operationService, times(operationNum)).handleOperation(any());
+ verify(client, times(operationNum)).acknowledgeOperation(any());
+ }
+
+ @Test
+ void testSendHeartbeatHandlesNoHeartbeatResponse() {
+ C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+ when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+ when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.empty());
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(any());
+ verify(client).publishHeartbeat(heartbeat);
+ verify(operationService, times(0)).handleOperation(any());
+ verify(client, times(0)).acknowledgeOperation(any());
+ }
+
+ @Test
+ void testSendHeartbeatNotHandledWhenThereAreNoOperationsSent() {
+ C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+ when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+ C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+ when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(any());
+ verify(client).publishHeartbeat(heartbeat);
+ verify(operationService, times(0)).handleOperation(any());
+ verify(client, times(0)).acknowledgeOperation(any());
+ }
+
+ @Test
+ void testSendHeartbeatNotAckWhenOperationAckMissing() {
+ C2Heartbeat heartbeat = mock(C2Heartbeat.class);
+ when(c2HeartbeatFactory.create(any())).thenReturn(heartbeat);
+ C2HeartbeatResponse hbResponse = new C2HeartbeatResponse();
+ hbResponse.setRequestedOperations(generateOperation(1));
+ when(client.publishHeartbeat(heartbeat)).thenReturn(Optional.of(hbResponse));
+ when(operationService.handleOperation(any())).thenReturn(Optional.empty());
+
+ c2ClientService.sendHeartbeat(runtimeInfoWrapper);
+
+ verify(c2HeartbeatFactory).create(any());
+ verify(client).publishHeartbeat(heartbeat);
+ verify(operationService).handleOperation(any());
+ verify(client, times(0)).acknowledgeOperation(any());
+ }
+
+ private List<C2Operation> generateOperation(int num) {
+ return IntStream.range(0, num)
+ .mapToObj(x -> new C2Operation())
+ .collect(Collectors.toList());
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
new file mode 100644
index 0000000000..111d750db5
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/C2HeartbeatFactoryTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.nifi.c2.client.C2ClientConfig;
+import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.protocol.api.AgentRepositories;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
+import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class C2HeartbeatFactoryTest {
+
+ private static final String AGENT_CLASS = "agentClass";
+ private static final String FLOW_ID = "flowId";
+
+ @Mock
+ private C2ClientConfig clientConfig;
+
+ @Mock
+ private FlowIdHolder flowIdHolder;
+
+ @InjectMocks
+ private C2HeartbeatFactory c2HeartbeatFactory;
+
+ @TempDir
+ private File tempDir;
+
+ @BeforeEach
+ public void setup() {
+ when(clientConfig.getConfDirectory()).thenReturn(tempDir.getAbsolutePath());
+ }
+
+ @Test
+ void testCreateHeartbeat() {
+ when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+ when(clientConfig.getAgentClass()).thenReturn(AGENT_CLASS);
+
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
+
+ assertEquals(FLOW_ID, heartbeat.getFlowId());
+ assertEquals(AGENT_CLASS, heartbeat.getAgentClass());
+ }
+
+ @Test
+ void testCreateGeneratesAgentAndDeviceIdIfNotPresent() {
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class));
+
+ assertNotNull(heartbeat.getAgentId());
+ assertNotNull(heartbeat.getDeviceId());
+ }
+
+ @Test
+ void testCreatePopulatesFromRuntimeInfoWrapper() {
+ AgentRepositories repos = new AgentRepositories();
+ RuntimeManifest manifest = new RuntimeManifest();
+ Map<String, FlowQueueStatus> queueStatus = new HashMap<>();
+
+ C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus));
+
+ assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
+ assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
+ assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
+ }
+
+ @Test
+ void testCreateThrowsExceptionWhenConfDirNotSet() {
+ when(clientConfig.getConfDirectory()).thenReturn(String.class.getSimpleName());
+
+ assertThrows(IllegalStateException.class, () -> c2HeartbeatFactory.create(mock(RuntimeInfoWrapper.class)));
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
new file mode 100644
index 0000000000..0249369e27
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/C2OperationServiceTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class C2OperationServiceTest {
+
+ private static C2OperationAck operationAck;
+
+ @BeforeAll
+ public static void setup(){
+ operationAck = new C2OperationAck();
+ operationAck.setOperationId("12345");
+ }
+
+ @Test
+ void testHandleOperationReturnsEmptyForUnrecognisedOperationType() {
+ C2OperationService service = new C2OperationService(Collections.emptyList());
+
+ C2Operation operation = new C2Operation();
+ operation.setOperation(OperationType.UPDATE);
+ operation.setOperand(OperandType.CONFIGURATION);
+ Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+ assertFalse(ack.isPresent());
+ }
+
+ @Test
+ void testHandleOperation() {
+ C2OperationService service = new C2OperationService(Collections.singletonList(new TestDescribeOperationHandler()));
+
+ C2Operation operation = new C2Operation();
+ operation.setOperation(OperationType.DESCRIBE);
+ operation.setOperand(OperandType.MANIFEST);
+ Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+ assertTrue(ack.isPresent());
+ assertEquals(operationAck, ack.get());
+ }
+
+ @Test
+ void testHandleOperationReturnsEmptyForOperandMismatch() {
+ C2OperationService service = new C2OperationService(Collections.singletonList(new TestInvalidOperationHandler()));
+
+ C2Operation operation = new C2Operation();
+ operation.setOperation(OperationType.DESCRIBE);
+ operation.setOperand(OperandType.MANIFEST);
+ Optional<C2OperationAck> ack = service.handleOperation(operation);
+
+ assertFalse(ack.isPresent());
+ }
+
+ private static class TestDescribeOperationHandler implements C2OperationHandler {
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.DESCRIBE;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.MANIFEST;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ return operationAck;
+ }
+ }
+
+ private static class TestInvalidOperationHandler implements C2OperationHandler {
+
+ @Override
+ public OperationType getOperationType() {
+ return OperationType.DESCRIBE;
+ }
+
+ @Override
+ public OperandType getOperandType() {
+ return OperandType.CONFIGURATION;
+ }
+
+ @Override
+ public C2OperationAck handle(C2Operation operation) {
+ return null;
+ }
+ }
+}
diff --git a/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java
new file mode 100644
index 0000000000..17b18a0f44
--- /dev/null
+++ b/c2/c2-client-bundle/c2-client-service/src/test/java/org/apache/nifi/c2/client/service/operation/UpdateConfigurationOperationHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.c2.client.service.operation;
+
+import static org.apache.commons.lang3.StringUtils.EMPTY;
+import static org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler.LOCATION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.nifi.c2.client.api.C2Client;
+import org.apache.nifi.c2.client.service.FlowIdHolder;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class UpdateConfigurationOperationHandlerTest {
+
+ private static final String FLOW_ID = "flowId";
+ private static final String OPERATION_ID = "operationId";
+ private static final Map<String, String> CORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "/path/for/the/" + FLOW_ID);
+ private static final Map<String, String> INCORRECT_LOCATION_MAP = Collections.singletonMap(LOCATION, "incorrect/location");
+
+ @Mock
+ private C2Client client;
+
+ @Mock
+ private FlowIdHolder flowIdHolder;
+
+ @Test
+ void testUpdateConfigurationOperationHandlerCreateSuccess() {
+ UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
+
+ assertEquals(OperationType.UPDATE, handler.getOperationType());
+ assertEquals(OperandType.CONFIGURATION, handler.getOperandType());
+ }
+
+ @Test
+ void testHandleThrowsExceptionForIncorrectArg() {
+ UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(null, null, null);
+ C2Operation operation = new C2Operation();
+ operation.setArgs(INCORRECT_LOCATION_MAP);
+
+ IllegalStateException exception = assertThrows(IllegalStateException.class, () -> handler.handle(operation));
+ }
+
+ @Test
+ void testHandleReturnsNotAppliedWithNoContent() {
+ when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+ when(client.retrieveUpdateContent(any())).thenReturn(Optional.empty());
+ UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, null);
+ C2Operation operation = new C2Operation();
+ operation.setArgs(CORRECT_LOCATION_MAP);
+
+ C2OperationAck response = handler.handle(operation);
+
+ assertEquals(EMPTY, response.getOperationId());
+ assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
+ }
+
+ @Test
+ void testHandleReturnsNotAppliedWithContentApplyIssues() {
+ Function<byte[], Boolean> failedToUpdate = x -> false;
+ when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+ when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
+ UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, failedToUpdate);
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(OPERATION_ID);
+ operation.setArgs(CORRECT_LOCATION_MAP);
+
+ C2OperationAck response = handler.handle(operation);
+
+ assertEquals(OPERATION_ID, response.getOperationId());
+ assertEquals(C2OperationState.OperationState.NOT_APPLIED, response.getOperationState().getState());
+ }
+
+ @Test
+ void testHandleReturnsFullyApplied() {
+ Function<byte[], Boolean> successUpdate = x -> true;
+ when(flowIdHolder.getFlowId()).thenReturn(FLOW_ID);
+ when(client.retrieveUpdateContent(any())).thenReturn(Optional.of("content".getBytes()));
+ UpdateConfigurationOperationHandler handler = new UpdateConfigurationOperationHandler(client, flowIdHolder, successUpdate);
+ C2Operation operation = new C2Operation();
+ operation.setIdentifier(OPERATION_ID);
+ operation.setArgs(CORRECT_LOCATION_MAP);
+
+ C2OperationAck response = handler.handle(operation);
+
+ assertEquals(OPERATION_ID, response.getOperationId());
+ assertEquals(C2OperationState.OperationState.FULLY_APPLIED, response.getOperationState().getState());
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
index 39eef4e78c..66dcd9967d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/c2/C2NifiClientService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
import java.util.Optional;
import org.apache.nifi.c2.client.C2ClientConfig;
import org.apache.nifi.c2.client.http.C2HttpClient;
@@ -28,6 +29,8 @@ import org.apache.nifi.c2.client.service.C2ClientService;
import org.apache.nifi.c2.client.service.C2HeartbeatFactory;
import org.apache.nifi.c2.client.service.FlowIdHolder;
import org.apache.nifi.c2.client.service.model.RuntimeInfoWrapper;
+import org.apache.nifi.c2.client.service.operation.C2OperationService;
+import org.apache.nifi.c2.client.service.operation.UpdateConfigurationOperationHandler;
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentRepositoryStatus;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
@@ -42,7 +45,6 @@ import org.apache.nifi.extension.manifest.parser.jaxb.JAXBExtensionManifestParse
import org.apache.nifi.manifest.RuntimeManifestService;
import org.apache.nifi.manifest.StandardRuntimeManifestService;
import org.apache.nifi.nar.ExtensionManagerHolder;
-import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -66,7 +68,6 @@ public class C2NifiClientService {
private final C2ClientService c2ClientService;
- private final FlowService flowService;
private final FlowController flowController;
private final String propertiesDir;
private final ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
@@ -75,7 +76,7 @@ public class C2NifiClientService {
private final RuntimeManifestService runtimeManifestService;
private final long heartbeatPeriod;
- public C2NifiClientService(final NiFiProperties niFiProperties, final FlowService flowService, final FlowController flowController) {
+ public C2NifiClientService(final NiFiProperties niFiProperties, final FlowController flowController) {
C2ClientConfig clientConfig = generateClientConfig(niFiProperties);
FlowIdHolder flowIdHolder = new FlowIdHolder(clientConfig.getConfDirectory());
this.propertiesDir = niFiProperties.getProperty(NiFiProperties.PROPERTIES_FILE_PATH, null);
@@ -86,13 +87,12 @@ public class C2NifiClientService {
clientConfig.getRuntimeType()
);
this.heartbeatPeriod = clientConfig.getHeartbeatPeriod();
- this.flowService = flowService;
this.flowController = flowController;
+ C2HttpClient client = new C2HttpClient(clientConfig, new C2JacksonSerializer());
this.c2ClientService = new C2ClientService(
- new C2HttpClient(clientConfig, new C2JacksonSerializer()),
+ client,
new C2HeartbeatFactory(clientConfig, flowIdHolder),
- flowIdHolder,
- this::updateFlowContent
+ new C2OperationService(Arrays.asList(new UpdateConfigurationOperationHandler(client, flowIdHolder, this::updateFlowContent)))
);
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index c7ddf57a99..1900685d23 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -300,7 +300,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
final boolean c2Enabled = Boolean.parseBoolean(nifiProperties.getProperty(C2NiFiProperties.C2_ENABLE_KEY, "false"));
if (c2Enabled) {
logger.info("C2 enabled, creating a C2 client instance");
- c2NifiClientService = new C2NifiClientService(nifiProperties, this, this.controller);
+ c2NifiClientService = new C2NifiClientService(nifiProperties, this.controller);
c2NifiClientService.start();
} else {
logger.info("C2 Property [{}] missing or disabled: C2 client not created", C2NiFiProperties.C2_ENABLE_KEY);